diff --git a/plexpy/__init__.py b/plexpy/__init__.py index 1e3e8c08..cee06bcb 100644 --- a/plexpy/__init__.py +++ b/plexpy/__init__.py @@ -381,7 +381,7 @@ def start(): # Start background notification thread if any([CONFIG.MOVIE_NOTIFY_ENABLE, CONFIG.TV_NOTIFY_ENABLE, CONFIG.MUSIC_NOTIFY_ENABLE, CONFIG.NOTIFY_RECENTLY_ADDED]): - notification_handler.start_thread(num_threads=3) + notification_handler.start_threads(num_threads=3) started = True diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index 199fae11..f3172cef 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -74,16 +74,13 @@ class ActivityHandler(object): session = self.get_live_session() - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=session, notify_action='on_play')) + plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_play'}) # Write the new session to our temp session table self.update_db_session(session=session) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=session, notify_action='on_concurrent')) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=session, notify_action='on_newdevice')) + plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_concurrent'}) + plexpy.NOTIFY_QUEUE.put({'stream_data': session, 'notify_action': 'on_newdevice'}) def on_stop(self, force_stop=False): if self.is_valid_session(): @@ -104,8 +101,7 @@ class ActivityHandler(object): # Retrieve the session data from our temp table db_session = ap.get_session_by_key(session_key=self.get_session_key()) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=db_session, notify_action='on_stop')) + plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_stop'}) # Write it to the history table monitor_proc = activity_processor.ActivityProcessor() @@ -132,8 +128,7 @@ class ActivityHandler(object): # Retrieve the session data from our temp table db_session = ap.get_session_by_key(session_key=self.get_session_key()) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=db_session, notify_action='on_pause')) + plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_pause'}) def on_resume(self): if self.is_valid_session(): @@ -151,8 +146,7 @@ class ActivityHandler(object): # Retrieve the session data from our temp table db_session = ap.get_session_by_key(session_key=self.get_session_key()) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=db_session, notify_action='on_resume')) + plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_resume'}) def on_buffer(self): if self.is_valid_session(): @@ -181,8 +175,7 @@ class ActivityHandler(object): time_since_last_trigger == 0 or time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT): ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=db_session, notify_action='on_buffer')) + plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_buffer'}) # This function receives events from our websocket connection def process(self): @@ -229,8 +222,7 @@ class ActivityHandler(object): notify_states = notification_handler.get_notify_state(session=db_session) if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \ and not any(d['notify_action'] == 'on_watched' for d in notify_states): - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=db_session, notify_action='on_watched')) + plexpy.NOTIFY_QUEUE.put({'stream_data': db_session, 'notify_action': 'on_watched'}) else: # We don't have this session in our table yet, start a new one. diff --git a/plexpy/activity_pinger.py b/plexpy/activity_pinger.py index a9689c3c..8ca85dde 100644 --- a/plexpy/activity_pinger.py +++ b/plexpy/activity_pinger.py @@ -48,8 +48,7 @@ def check_active_sessions(ws_request=False): if int_ping_count >= 3: logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.") - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - notify_action='on_intup')) + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'}) int_ping_count = 0 @@ -70,14 +69,12 @@ def check_active_sessions(ws_request=False): if session['state'] == 'paused': logger.debug(u"PlexPy Monitor :: Session %s has been paused." % stream['session_key']) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_pause')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_pause'}) if session['state'] == 'playing' and stream['state'] == 'paused': logger.debug(u"PlexPy Monitor :: Session %s has been resumed." % stream['session_key']) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_resume')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_resume'}) if stream['state'] == 'paused' and not ws_request: # The stream is still paused so we need to increment the paused_counter @@ -115,8 +112,7 @@ def check_active_sessions(ws_request=False): 'WHERE session_key = ? AND rating_key = ?', [stream['session_key'], stream['rating_key']]) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_buffer')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_buffer'}) else: # Subsequent buffer notifications after wait time @@ -130,8 +126,7 @@ def check_active_sessions(ws_request=False): 'WHERE session_key = ? AND rating_key = ?', [stream['session_key'], stream['rating_key']]) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_buffer')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_buffer'}) logger.debug(u"PlexPy Monitor :: Session %s is buffering. Count is now %s. Last triggered %s." % (stream['session_key'], @@ -146,8 +141,7 @@ def check_active_sessions(ws_request=False): notify_states = notification_handler.get_notify_state(session=session) if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \ and not any(d['notify_action'] == 'on_watched' for d in notify_states): - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_watched')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_watched'}) else: # The user has stopped playing a stream @@ -165,11 +159,9 @@ def check_active_sessions(ws_request=False): notify_states = notification_handler.get_notify_state(session=stream) if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT \ and not any(d['notify_action'] == 'on_watched' for d in notify_states): - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_watched')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_watched'}) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=stream, notify_action='on_stop')) + plexpy.NOTIFY_QUEUE.put({'stream_data': stream, 'notify_action': 'on_stop'}) # Write the item history on playback stop success = monitor_process.write_session_history(session=stream) @@ -221,8 +213,7 @@ def check_active_sessions(ws_request=False): % str(int_ping_count)) if int_ping_count == 3: - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - notify_action='on_intdown')) + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) def check_recently_added(): @@ -275,8 +266,7 @@ def check_recently_added(): if 0 < time_threshold - int(item['added_at']) <= time_interval: logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key'])) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - timeline_data=item, notify_action='on_created')) + plexpy.NOTIFY_QUEUE.put({'timeline_data': item, 'notify_action': 'on_created'}) else: item = max(metadata, key=lambda x:x['added_at']) @@ -294,8 +284,7 @@ def check_recently_added(): logger.debug(u"PlexPy Monitor :: Library item %s has been added to Plex." % str(item['rating_key'])) # Check if any notification agents have notifications enabled - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - timeline_data=item, notify_action='on_created')) + plexpy.NOTIFY_QUEUE.put({'timeline_data': item, 'notify_action': 'on_created'}) def check_server_response(): @@ -327,14 +316,12 @@ def check_server_response(): if ext_ping_count >= 3: logger.info(u"PlexPy Monitor :: Plex remote access is back up.") - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - notify_action='on_extup')) + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_extup'}) ext_ping_count = 0 if ext_ping_count == 3: - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - notify_action='on_extdown')) + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_extdown'}) def check_server_updates(): @@ -351,8 +338,7 @@ def check_server_updates(): if download_info['update_available']: logger.info(u"PlexPy Monitor :: PMS update available version: %s", download_info['version']) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - notify_action='on_pmsupdate')) + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_pmsupdate'}) else: logger.info(u"PlexPy Monitor :: No PMS update available.") \ No newline at end of file diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index 64dd8bb6..351b8d69 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -99,8 +99,7 @@ class ActivityProcessor(object): # Check if any notification agents have notifications enabled if notify: values.update({'ip_address': session['ip_address']}) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=values, notify_action='on_play')) + plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_play'}) # If it's our first write then time stamp it. started = int(time.time()) @@ -116,10 +115,8 @@ class ActivityProcessor(object): self.db.upsert('sessions', ip_address, keys) if notify: - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=values, notify_action='on_concurrent')) - plexpy.NOTIFY_QUEUE.put(notification_handler.add_to_queue( - stream_data=values, notify_action='on_newdevice')) + plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_concurrent'}) + plexpy.NOTIFY_QUEUE.put({'stream_data': values, 'notify_action': 'on_newdevice'}) return True diff --git a/plexpy/notification_handler.py b/plexpy/notification_handler.py index 73a21c59..c8147af1 100644 --- a/plexpy/notification_handler.py +++ b/plexpy/notification_handler.py @@ -36,11 +36,16 @@ import users def process_queue(): queue = plexpy.NOTIFY_QUEUE while True: - queue.get() + params = queue.get() + if params: + if 'notifier_id' in params: + notify(**params) + else: + add_notifier_each(**params) queue.task_done() -def start_thread(num_threads=1): +def start_threads(num_threads=1): logger.info(u"PlexPy NotificationHandler :: Starting background notification handler.") for x in range(num_threads): thread = threading.Thread(target=process_queue) @@ -48,7 +53,7 @@ def start_thread(num_threads=1): thread.start() -def add_to_queue(notify_action=None, stream_data=None, timeline_data=None): +def add_notifier_each(notify_action=None, stream_data=None, timeline_data=None): if not notify_action: logger.debug(u"PlexPy NotificationHandler :: Notify called but no action received.") return @@ -64,10 +69,10 @@ def add_to_queue(notify_action=None, stream_data=None, timeline_data=None): stream_data=stream_data, timeline_data=timeline_data) if conditions: - plexpy.NOTIFY_QUEUE.put(notify(notifier_id=notifier['id'], - notify_action=notify_action, - stream_data=stream_data, - timeline_data=timeline_data)) + plexpy.NOTIFY_QUEUE.put({'notifier_id': notifier['id'], + 'notify_action': notify_action, + 'stream_data': stream_data, + 'timeline_data': timeline_data}) def notify_conditions(notifier=None, notify_action=None, stream_data=None, timeline_data=None): if stream_data: