mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-08 06:00:51 -07:00
Multithreaded notification queue
This commit is contained in:
parent
82f4c99025
commit
08a8b5fee0
5 changed files with 38 additions and 58 deletions
|
@ -381,7 +381,7 @@ def start():
|
||||||
# Start background notification thread
|
# Start background notification thread
|
||||||
if any([CONFIG.MOVIE_NOTIFY_ENABLE, CONFIG.TV_NOTIFY_ENABLE,
|
if any([CONFIG.MOVIE_NOTIFY_ENABLE, CONFIG.TV_NOTIFY_ENABLE,
|
||||||
CONFIG.MUSIC_NOTIFY_ENABLE, CONFIG.NOTIFY_RECENTLY_ADDED]):
|
CONFIG.MUSIC_NOTIFY_ENABLE, CONFIG.NOTIFY_RECENTLY_ADDED]):
|
||||||
notification_handler.start_thread(num_threads=3)
|
notification_handler.start_threads(num_threads=3)
|
||||||
|
|
||||||
started = True
|
started = True
|
||||||
|
|
||||||
|
|
|
@ -74,16 +74,13 @@ class ActivityHandler(object):
|
||||||
|
|
||||||
session = self.get_live_session()
|
session = self.get_live_session()
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_play'})
|
||||||
stream_data=session, notify_action='on_play'))
|
|
||||||
|
|
||||||
# Write the new session to our temp session table
|
# Write the new session to our temp session table
|
||||||
self.update_db_session(session=session)
|
self.update_db_session(session=session)
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_concurrent'})
|
||||||
stream_data=session, notify_action='on_concurrent'))
|
plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_newdevice'})
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
|
||||||
stream_data=session, notify_action='on_newdevice'))
|
|
||||||
|
|
||||||
def on_stop(self, force_stop=False):
|
def on_stop(self, force_stop=False):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
|
@ -104,8 +101,7 @@ class ActivityHandler(object):
|
||||||
# Retrieve the session data from our temp table
|
# Retrieve the session data from our temp table
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_stop'})
|
||||||
stream_data=db_session, notify_action='on_stop'))
|
|
||||||
|
|
||||||
# Write it to the history table
|
# Write it to the history table
|
||||||
monitor_proc = activity_processor.ActivityProcessor()
|
monitor_proc = activity_processor.ActivityProcessor()
|
||||||
|
@ -132,8 +128,7 @@ class ActivityHandler(object):
|
||||||
# Retrieve the session data from our temp table
|
# Retrieve the session data from our temp table
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_pause'})
|
||||||
stream_data=db_session, notify_action='on_pause'))
|
|
||||||
|
|
||||||
def on_resume(self):
|
def on_resume(self):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
|
@ -151,8 +146,7 @@ class ActivityHandler(object):
|
||||||
# Retrieve the session data from our temp table
|
# Retrieve the session data from our temp table
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_resume'})
|
||||||
stream_data=db_session, notify_action='on_resume'))
|
|
||||||
|
|
||||||
def on_buffer(self):
|
def on_buffer(self):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
|
@ -181,8 +175,7 @@ class ActivityHandler(object):
|
||||||
time_since_last_trigger == 0 or time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT):
|
time_since_last_trigger == 0 or time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT):
|
||||||
ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
|
ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_buffer'})
|
||||||
stream_data=db_session, notify_action='on_buffer'))
|
|
||||||
|
|
||||||
# This function receives events from our websocket connection
|
# This function receives events from our websocket connection
|
||||||
def process(self):
|
def process(self):
|
||||||
|
@ -229,8 +222,7 @@ class ActivityHandler(object):
|
||||||
notify_states = notification_handler.get_notify_state(session=db_session)
|
notify_states = notification_handler.get_notify_state(session=db_session)
|
||||||
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
||||||
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_watched'})
|
||||||
stream_data=db_session, notify_action='on_watched'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# We don't have this session in our table yet, start a new one.
|
# We don't have this session in our table yet, start a new one.
|
||||||
|
|
|
@ -48,8 +48,7 @@ def check_active_sessions(ws_request=False):
|
||||||
if int_ping_count >= 3:
|
if int_ping_count >= 3:
|
||||||
logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.")
|
logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.")
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
|
||||||
notify_action='on_intup'))
|
|
||||||
|
|
||||||
int_ping_count = 0
|
int_ping_count = 0
|
||||||
|
|
||||||
|
@ -70,14 +69,12 @@ def check_active_sessions(ws_request=False):
|
||||||
if session['state'] == 'paused':
|
if session['state'] == 'paused':
|
||||||
logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key'])
|
logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key'])
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_pause'})
|
||||||
stream_data=stream, notify_action='on_pause'))
|
|
||||||
|
|
||||||
if session['state'] == 'playing' and stream['state'] == 'paused':
|
if session['state'] == 'playing' and stream['state'] == 'paused':
|
||||||
logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key'])
|
logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key'])
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_resume'})
|
||||||
stream_data=stream, notify_action='on_resume'))
|
|
||||||
|
|
||||||
if stream['state'] == 'paused' and not ws_request:
|
if stream['state'] == 'paused' and not ws_request:
|
||||||
# The stream is still paused so we need to increment the paused_counter
|
# The stream is still paused so we need to increment the paused_counter
|
||||||
|
@ -115,8 +112,7 @@ def check_active_sessions(ws_request=False):
|
||||||
'WHERE session_key = ? AND rating_key = ?',
|
'WHERE session_key = ? AND rating_key = ?',
|
||||||
[stream['session_key'], stream['rating_key']])
|
[stream['session_key'], stream['rating_key']])
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_buffer'})
|
||||||
stream_data=stream, notify_action='on_buffer'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Subsequent buffer notifications after wait time
|
# Subsequent buffer notifications after wait time
|
||||||
|
@ -130,8 +126,7 @@ def check_active_sessions(ws_request=False):
|
||||||
'WHERE session_key = ? AND rating_key = ?',
|
'WHERE session_key = ? AND rating_key = ?',
|
||||||
[stream['session_key'], stream['rating_key']])
|
[stream['session_key'], stream['rating_key']])
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_buffer'})
|
||||||
stream_data=stream, notify_action='on_buffer'))
|
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s."
|
logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s."
|
||||||
% (stream['session_key'],
|
% (stream['session_key'],
|
||||||
|
@ -146,8 +141,7 @@ def check_active_sessions(ws_request=False):
|
||||||
notify_states = notification_handler.get_notify_state(session=session)
|
notify_states = notification_handler.get_notify_state(session=session)
|
||||||
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
||||||
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_watched'})
|
||||||
stream_data=stream, notify_action='on_watched'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# The user has stopped playing a stream
|
# The user has stopped playing a stream
|
||||||
|
@ -165,11 +159,9 @@ def check_active_sessions(ws_request=False):
|
||||||
notify_states = notification_handler.get_notify_state(session=stream)
|
notify_states = notification_handler.get_notify_state(session=stream)
|
||||||
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \
|
||||||
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
and not any(d['notify_action'] == 'on_watched' for d in notify_states):
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_watched'})
|
||||||
stream_data=stream, notify_action='on_watched'))
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_stop'})
|
||||||
stream_data=stream, notify_action='on_stop'))
|
|
||||||
|
|
||||||
# Write the item history on playback stop
|
# Write the item history on playback stop
|
||||||
success = monitor_process.write_session_history(session=stream)
|
success = monitor_process.write_session_history(session=stream)
|
||||||
|
@ -221,8 +213,7 @@ def check_active_sessions(ws_request=False):
|
||||||
% str(int_ping_count))
|
% str(int_ping_count))
|
||||||
|
|
||||||
if int_ping_count == 3:
|
if int_ping_count == 3:
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
||||||
notify_action='on_intdown'))
|
|
||||||
|
|
||||||
|
|
||||||
def check_recently_added():
|
def check_recently_added():
|
||||||
|
@ -275,8 +266,7 @@ def check_recently_added():
|
||||||
if 0 < time_threshold - int(item['added_at']) <= time_interval:
|
if 0 < time_threshold - int(item['added_at']) <= time_interval:
|
||||||
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
|
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'timeline_data': item, 'notify_action': 'on_created'})
|
||||||
timeline_data=item, notify_action='on_created'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
item = max(metadata, key=lambda x:x['added_at'])
|
item = max(metadata, key=lambda x:x['added_at'])
|
||||||
|
@ -294,8 +284,7 @@ def check_recently_added():
|
||||||
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
|
logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key']))
|
||||||
|
|
||||||
# Check if any notification agents have notifications enabled
|
# Check if any notification agents have notifications enabled
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'timeline_data': item, 'notify_action': 'on_created'})
|
||||||
timeline_data=item, notify_action='on_created'))
|
|
||||||
|
|
||||||
|
|
||||||
def check_server_response():
|
def check_server_response():
|
||||||
|
@ -327,14 +316,12 @@ def check_server_response():
|
||||||
if ext_ping_count >= 3:
|
if ext_ping_count >= 3:
|
||||||
logger.info(u"PlexPy Monitor :: Plex remote access is back up.")
|
logger.info(u"PlexPy Monitor :: Plex remote access is back up.")
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_extup'})
|
||||||
notify_action='on_extup'))
|
|
||||||
|
|
||||||
ext_ping_count = 0
|
ext_ping_count = 0
|
||||||
|
|
||||||
if ext_ping_count == 3:
|
if ext_ping_count == 3:
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_extdown'})
|
||||||
notify_action='on_extdown'))
|
|
||||||
|
|
||||||
|
|
||||||
def check_server_updates():
|
def check_server_updates():
|
||||||
|
@ -351,8 +338,7 @@ def check_server_updates():
|
||||||
if download_info['update_available']:
|
if download_info['update_available']:
|
||||||
logger.info(u"PlexPy Monitor :: PMS update available version: %s", download_info['version'])
|
logger.info(u"PlexPy Monitor :: PMS update available version: %s", download_info['version'])
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_pmsupdate'})
|
||||||
notify_action='on_pmsupdate'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
logger.info(u"PlexPy Monitor :: No PMS update available.")
|
logger.info(u"PlexPy Monitor :: No PMS update available.")
|
|
@ -99,8 +99,7 @@ class ActivityProcessor(object):
|
||||||
# Check if any notification agents have notifications enabled
|
# Check if any notification agents have notifications enabled
|
||||||
if notify:
|
if notify:
|
||||||
values.update({'ip_address': session['ip_address']})
|
values.update({'ip_address': session['ip_address']})
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_play'})
|
||||||
stream_data=values, notify_action='on_play'))
|
|
||||||
|
|
||||||
# If it's our first write then time stamp it.
|
# If it's our first write then time stamp it.
|
||||||
started = int(time.time())
|
started = int(time.time())
|
||||||
|
@ -116,10 +115,8 @@ class ActivityProcessor(object):
|
||||||
self.db.upsert('sessions', ip_address, keys)
|
self.db.upsert('sessions', ip_address, keys)
|
||||||
|
|
||||||
if notify:
|
if notify:
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_concurrent'})
|
||||||
stream_data=values, notify_action='on_concurrent'))
|
plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_newdevice'})
|
||||||
plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue(
|
|
||||||
stream_data=values, notify_action='on_newdevice'))
|
|
||||||
|
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
|
@ -36,11 +36,16 @@ import users
|
||||||
def process_queue():
|
def process_queue():
|
||||||
queue = plexpy.NOTIFY_QUEUE
|
queue = plexpy.NOTIFY_QUEUE
|
||||||
while True:
|
while True:
|
||||||
queue.get()
|
params = queue.get()
|
||||||
|
if params:
|
||||||
|
if 'notifier_id' in params:
|
||||||
|
notify(**params)
|
||||||
|
else:
|
||||||
|
add_notifier_each(**params)
|
||||||
queue.task_done()
|
queue.task_done()
|
||||||
|
|
||||||
|
|
||||||
def start_thread(num_threads=1):
|
def start_threads(num_threads=1):
|
||||||
logger.info(u"PlexPy NotificationHandler :: Starting background notification handler.")
|
logger.info(u"PlexPy NotificationHandler :: Starting background notification handler.")
|
||||||
for x in range(num_threads):
|
for x in range(num_threads):
|
||||||
thread = threading.Thread(target=process_queue)
|
thread = threading.Thread(target=process_queue)
|
||||||
|
@ -48,7 +53,7 @@ def start_thread(num_threads=1):
|
||||||
thread.start()
|
thread.start()
|
||||||
|
|
||||||
|
|
||||||
def add_to_queue(notify_action=None, stream_data=None, timeline_data=None):
|
def add_notifier_each(notify_action=None, stream_data=None, timeline_data=None):
|
||||||
if not notify_action:
|
if not notify_action:
|
||||||
logger.debug(u"PlexPy NotificationHandler :: Notify called but no action received.")
|
logger.debug(u"PlexPy NotificationHandler :: Notify called but no action received.")
|
||||||
return
|
return
|
||||||
|
@ -64,10 +69,10 @@ def add_to_queue(notify_action=None, stream_data=None, timeline_data=None):
|
||||||
stream_data=stream_data,
|
stream_data=stream_data,
|
||||||
timeline_data=timeline_data)
|
timeline_data=timeline_data)
|
||||||
if conditions:
|
if conditions:
|
||||||
plexpy.NOTIFY_QUEUE.put(notify(notifier_id=notifier['id'],
|
plexpy.NOTIFY_QUEUE.put({'notifier_id': notifier['id'],
|
||||||
notify_action=notify_action,
|
'notify_action': notify_action,
|
||||||
stream_data=stream_data,
|
'stream_data': stream_data,
|
||||||
timeline_data=timeline_data))
|
'timeline_data': timeline_data})
|
||||||
|
|
||||||
def notify_conditions(notifier=None, notify_action=None, stream_data=None, timeline_data=None):
|
def notify_conditions(notifier=None, notify_action=None, stream_data=None, timeline_data=None):
|
||||||
if stream_data:
|
if stream_data:
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue