From eb3c189ab6dff7e7072391e3345ea5d37e46f40e Mon Sep 17 00:00:00 2001 From: JonnyWong16 Date: Sat, 8 Oct 2016 21:48:27 -0700 Subject: [PATCH] Rework task scheduler for websocket only --- PlexPy.py | 1 - plexpy/__init__.py | 34 ++++++++++++++++++++++++++-------- plexpy/activity_pinger.py | 30 +++++++++--------------------- plexpy/config.py | 2 ++ plexpy/plexivity_import.py | 2 +- plexpy/plexwatch_import.py | 2 +- plexpy/web_socket.py | 26 +++++++++++++++++++------- 7 files changed, 58 insertions(+), 39 deletions(-) diff --git a/PlexPy.py b/PlexPy.py index 09725088..e82d48a8 100755 --- a/PlexPy.py +++ b/PlexPy.py @@ -194,7 +194,6 @@ def main(): web_socket.start_thread() except: logger.warn(u"Websocket :: Unable to open connection.") - plexpy.initialize_scheduler() # Force the http port if neccessary if args.port: diff --git a/plexpy/__init__.py b/plexpy/__init__.py index bed1ce12..c3bb0780 100644 --- a/plexpy/__init__.py +++ b/plexpy/__init__.py @@ -85,6 +85,7 @@ HTTP_ROOT = None DEV = False WS_CONNECTED = False +PLEX_SERVER_UP = True def initialize(config_file): @@ -310,15 +311,15 @@ def initialize_scheduler(): monitor_seconds = CONFIG.MONITORING_INTERVAL if CONFIG.MONITORING_INTERVAL >= 30 else 30 #schedule_job(activity_pinger.check_active_sessions, 'Check for active sessions', - # hours=0, minutes=0, seconds=1) + # hours=0, minutes=0, seconds=1) #schedule_job(activity_pinger.check_recently_added, 'Check for recently added items', - # hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.NOTIFY_RECENTLY_ADDED)) + # hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.NOTIFY_RECENTLY_ADDED)) schedule_job(plextv.get_real_pms_url, 'Refresh Plex server URLs', hours=12, minutes=0, seconds=0) schedule_job(pmsconnect.get_server_friendly_name, 'Refresh Plex server name', hours=12, minutes=0, seconds=0) - schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access', + schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access', hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.MONITOR_REMOTE_ACCESS)) schedule_job(activity_pinger.check_server_updates, 'Check for Plex updates', hours=12 * bool(CONFIG.MONITOR_PMS_UPDATES), minutes=0, seconds=0) @@ -329,13 +330,32 @@ def initialize_scheduler(): schedule_job(plextv.refresh_users, 'Refresh users list', hours=user_hours, minutes=0, seconds=0) - schedule_job(pmsconnect.refresh_libraries, 'Refresh libraries list', hours=library_hours, minutes=0, seconds=0) else: - ## Add new taks to check and reconnect websocket - pass + # Cancel all jobs + schedule_job(plextv.get_real_pms_url, 'Refresh Plex server URLs', + hours=0, minutes=0, seconds=0) + schedule_job(pmsconnect.get_server_friendly_name, 'Refresh Plex server name', + hours=0, minutes=0, seconds=0) + + schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access', + hours=0, minutes=0, seconds=0) + schedule_job(activity_pinger.check_server_updates, 'Check for Plex updates', + hours=0, minutes=0, seconds=0) + + schedule_job(plextv.refresh_users, 'Refresh users list', + hours=0, minutes=0, seconds=0) + schedule_job(pmsconnect.refresh_libraries, 'Refresh libraries list', + hours=0, minutes=0, seconds=0) + + # Schedule job to reconnect websocket + response_seconds = CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS * CONFIG.WEBSOCKET_CONNECTION_TIMEOUT + response_seconds = 60 if response_seconds < 60 else response_seconds + + schedule_job(activity_pinger.check_server_response, 'Check server response', + hours=0, minutes=0, seconds=response_seconds) # Start scheduler if start_jobs and len(SCHED.get_jobs()): @@ -344,8 +364,6 @@ def initialize_scheduler(): except Exception as e: logger.info(e) - # Debug - #SCHED.print_jobs() def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None): """ diff --git a/plexpy/activity_pinger.py b/plexpy/activity_pinger.py index 64e2f373..04c73fd8 100644 --- a/plexpy/activity_pinger.py +++ b/plexpy/activity_pinger.py @@ -26,6 +26,7 @@ import notification_handler import notifiers import plextv import pmsconnect +import web_socket monitor_lock = threading.Lock() @@ -42,16 +43,7 @@ def check_active_sessions(ws_request=False): monitor_process = activity_processor.ActivityProcessor() # logger.debug(u"PlexPy Monitor :: Checking for active streams.") - global int_ping_count - if session_list: - if int_ping_count >= 3: - logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.") - - plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'}) - - int_ping_count = 0 - media_container = session_list['sessions'] # Check our temp table for what we must do with the new streams @@ -203,18 +195,6 @@ def check_active_sessions(ws_request=False): else: logger.debug(u"PlexPy Monitor :: Unable to read session list.") - if int_ping_count == 0: - # Temporarily set the stopped time for all sessions - stopped_time = int(time.time()) - monitor_db.action('UPDATE sessions SET stopped = ?', [stopped_time]) - - int_ping_count += 1 - logger.warn(u"PlexPy Monitor :: Unable to get an internal response from the server, ping attempt %s." \ - % str(int_ping_count)) - - if int_ping_count == 3: - plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) - def check_recently_added(): @@ -289,6 +269,14 @@ def check_recently_added(): def check_server_response(): + try: + web_socket.start_thread() + except: + logger.warn(u"Websocket :: Unable to open connection.") + + +def check_server_access(): + with monitor_lock: pms_connect = pmsconnect.PmsConnect() server_response = pms_connect.get_server_response() diff --git a/plexpy/config.py b/plexpy/config.py index 003f06af..5d208ffb 100644 --- a/plexpy/config.py +++ b/plexpy/config.py @@ -568,6 +568,8 @@ _CONFIG_DEFINITIONS = { 'UPDATE_NOTIFIERS_DB': (int, 'General', 1), 'VERIFY_SSL_CERT': (bool_int, 'Advanced', 1), 'VIDEO_LOGGING_ENABLE': (int, 'Monitoring', 1), + 'WEBSOCKET_CONNECTION_ATTEMPTS': (int, 'Advanced', 5), + 'WEBSOCKET_CONNECTION_TIMEOUT': (int, 'Advanced', 5), 'XBMC_ENABLED': (int, 'XBMC', 0), 'XBMC_HOST': (str, 'XBMC', ''), 'XBMC_PASSWORD': (str, 'XBMC', ''), diff --git a/plexpy/plexivity_import.py b/plexpy/plexivity_import.py index 7cd09b1c..b482c2a0 100644 --- a/plexpy/plexivity_import.py +++ b/plexpy/plexivity_import.py @@ -284,7 +284,7 @@ def import_from_plexivity(database=None, table_name=None, import_ignore_interval hours=0, minutes=0, seconds=0) plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items', hours=0, minutes=0, seconds=0) - plexpy.schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access', + plexpy.schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access', hours=0, minutes=0, seconds=0) ap = activity_processor.ActivityProcessor() diff --git a/plexpy/plexwatch_import.py b/plexpy/plexwatch_import.py index 413af0ae..7cd3d4d9 100644 --- a/plexpy/plexwatch_import.py +++ b/plexpy/plexwatch_import.py @@ -275,7 +275,7 @@ def import_from_plexwatch(database=None, table_name=None, import_ignore_interval hours=0, minutes=0, seconds=0) plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items', hours=0, minutes=0, seconds=0) - plexpy.schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access', + plexpy.schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access', hours=0, minutes=0, seconds=0) ap = activity_processor.ActivityProcessor() diff --git a/plexpy/web_socket.py b/plexpy/web_socket.py index cdd6b0b8..35458f2b 100644 --- a/plexpy/web_socket.py +++ b/plexpy/web_socket.py @@ -66,19 +66,25 @@ def run(): ws_reconnect = False reconnects = 0 - # Try an open the websocket connection - if it fails after 15 retries fallback to polling - while not plexpy.WS_CONNECTED and reconnects <= 15: + # Try an open the websocket connection + while not plexpy.WS_CONNECTED and reconnects <= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: try: logger.info(u"PlexPy WebSocket :: Opening%s websocket, connection attempt %s." % (secure, str(reconnects + 1))) ws = create_connection(uri, header=header) reconnects = 0 logger.info(u"PlexPy WebSocket :: Ready") plexpy.WS_CONNECTED = True + + if not plexpy.PLEX_SERVER_UP: + logger.info(u"PlexPy WebSocket :: The Plex Media Server is back up.") + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'}) + plexpy.PLEX_SERVER_UP = True + plexpy.initialize_scheduler() except IOError as e: logger.error(u"PlexPy WebSocket :: %s." % e) reconnects += 1 - time.sleep(5) + time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT) while plexpy.WS_CONNECTED: try: @@ -87,16 +93,17 @@ def run(): # successfully received data, reset reconnects counter reconnects = 0 except (websocket.WebSocketConnectionClosedException, Exception): - if reconnects <= 15: + if reconnects <= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: reconnects += 1 # Sleep 5 between connection attempts if reconnects > 1: - time.sleep(5) + time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT) logger.warn(u"PlexPy WebSocket :: Connection has closed, reconnection attempt %s." % reconnects) try: ws = create_connection(uri, header=header) + logger.info(u"PlexPy WebSocket :: Ready") except IOError as e: logger.info(u"PlexPy WebSocket :: %s." % e) @@ -113,8 +120,13 @@ def run(): start_thread() if not plexpy.WS_CONNECTED and not ws_reconnect: - logger.error(u"PlexPy WebSocket :: Connection unavailable, Plex server is down.") - plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) + logger.error(u"PlexPy WebSocket :: Connection unavailable.") + + if plexpy.PLEX_SERVER_UP: + logger.info(u"PlexPy WebSocket :: Unable to get an internal response from the server, Plex server is down.") + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) + plexpy.PLEX_SERVER_UP = False + plexpy.initialize_scheduler() logger.debug(u"PlexPy WebSocket :: Leaving thread.")