mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-06 05:01:14 -07:00
Rework task scheduler for websocket only
This commit is contained in:
parent
16c7d27508
commit
eb3c189ab6
7 changed files with 58 additions and 39 deletions
|
@ -194,7 +194,6 @@ def main():
|
||||||
web_socket.start_thread()
|
web_socket.start_thread()
|
||||||
except:
|
except:
|
||||||
logger.warn(u"Websocket :: Unable to open connection.")
|
logger.warn(u"Websocket :: Unable to open connection.")
|
||||||
plexpy.initialize_scheduler()
|
|
||||||
|
|
||||||
# Force the http port if neccessary
|
# Force the http port if neccessary
|
||||||
if args.port:
|
if args.port:
|
||||||
|
|
|
@ -85,6 +85,7 @@ HTTP_ROOT = None
|
||||||
DEV = False
|
DEV = False
|
||||||
|
|
||||||
WS_CONNECTED = False
|
WS_CONNECTED = False
|
||||||
|
PLEX_SERVER_UP = True
|
||||||
|
|
||||||
|
|
||||||
def initialize(config_file):
|
def initialize(config_file):
|
||||||
|
@ -310,15 +311,15 @@ def initialize_scheduler():
|
||||||
monitor_seconds = CONFIG.MONITORING_INTERVAL if CONFIG.MONITORING_INTERVAL >= 30 else 30
|
monitor_seconds = CONFIG.MONITORING_INTERVAL if CONFIG.MONITORING_INTERVAL >= 30 else 30
|
||||||
|
|
||||||
#schedule_job(activity_pinger.check_active_sessions, 'Check for active sessions',
|
#schedule_job(activity_pinger.check_active_sessions, 'Check for active sessions',
|
||||||
# hours=0, minutes=0, seconds=1)
|
# hours=0, minutes=0, seconds=1)
|
||||||
#schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
#schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
||||||
# hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.NOTIFY_RECENTLY_ADDED))
|
# hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.NOTIFY_RECENTLY_ADDED))
|
||||||
schedule_job(plextv.get_real_pms_url, 'Refresh Plex server URLs',
|
schedule_job(plextv.get_real_pms_url, 'Refresh Plex server URLs',
|
||||||
hours=12, minutes=0, seconds=0)
|
hours=12, minutes=0, seconds=0)
|
||||||
schedule_job(pmsconnect.get_server_friendly_name, 'Refresh Plex server name',
|
schedule_job(pmsconnect.get_server_friendly_name, 'Refresh Plex server name',
|
||||||
hours=12, minutes=0, seconds=0)
|
hours=12, minutes=0, seconds=0)
|
||||||
|
|
||||||
schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access',
|
schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access',
|
||||||
hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.MONITOR_REMOTE_ACCESS))
|
hours=0, minutes=0, seconds=monitor_seconds * bool(CONFIG.MONITOR_REMOTE_ACCESS))
|
||||||
schedule_job(activity_pinger.check_server_updates, 'Check for Plex updates',
|
schedule_job(activity_pinger.check_server_updates, 'Check for Plex updates',
|
||||||
hours=12 * bool(CONFIG.MONITOR_PMS_UPDATES), minutes=0, seconds=0)
|
hours=12 * bool(CONFIG.MONITOR_PMS_UPDATES), minutes=0, seconds=0)
|
||||||
|
@ -329,13 +330,32 @@ def initialize_scheduler():
|
||||||
|
|
||||||
schedule_job(plextv.refresh_users, 'Refresh users list',
|
schedule_job(plextv.refresh_users, 'Refresh users list',
|
||||||
hours=user_hours, minutes=0, seconds=0)
|
hours=user_hours, minutes=0, seconds=0)
|
||||||
|
|
||||||
schedule_job(pmsconnect.refresh_libraries, 'Refresh libraries list',
|
schedule_job(pmsconnect.refresh_libraries, 'Refresh libraries list',
|
||||||
hours=library_hours, minutes=0, seconds=0)
|
hours=library_hours, minutes=0, seconds=0)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
## Add new taks to check and reconnect websocket
|
# Cancel all jobs
|
||||||
pass
|
schedule_job(plextv.get_real_pms_url, 'Refresh Plex server URLs',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
schedule_job(pmsconnect.get_server_friendly_name, 'Refresh Plex server name',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
|
||||||
|
schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
schedule_job(activity_pinger.check_server_updates, 'Check for Plex updates',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
|
||||||
|
schedule_job(plextv.refresh_users, 'Refresh users list',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
schedule_job(pmsconnect.refresh_libraries, 'Refresh libraries list',
|
||||||
|
hours=0, minutes=0, seconds=0)
|
||||||
|
|
||||||
|
# Schedule job to reconnect websocket
|
||||||
|
response_seconds = CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS * CONFIG.WEBSOCKET_CONNECTION_TIMEOUT
|
||||||
|
response_seconds = 60 if response_seconds < 60 else response_seconds
|
||||||
|
|
||||||
|
schedule_job(activity_pinger.check_server_response, 'Check server response',
|
||||||
|
hours=0, minutes=0, seconds=response_seconds)
|
||||||
|
|
||||||
# Start scheduler
|
# Start scheduler
|
||||||
if start_jobs and len(SCHED.get_jobs()):
|
if start_jobs and len(SCHED.get_jobs()):
|
||||||
|
@ -344,8 +364,6 @@ def initialize_scheduler():
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(e)
|
logger.info(e)
|
||||||
|
|
||||||
# Debug
|
|
||||||
#SCHED.print_jobs()
|
|
||||||
|
|
||||||
def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None):
|
def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None):
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -26,6 +26,7 @@ import notification_handler
|
||||||
import notifiers
|
import notifiers
|
||||||
import plextv
|
import plextv
|
||||||
import pmsconnect
|
import pmsconnect
|
||||||
|
import web_socket
|
||||||
|
|
||||||
|
|
||||||
monitor_lock = threading.Lock()
|
monitor_lock = threading.Lock()
|
||||||
|
@ -42,16 +43,7 @@ def check_active_sessions(ws_request=False):
|
||||||
monitor_process = activity_processor.ActivityProcessor()
|
monitor_process = activity_processor.ActivityProcessor()
|
||||||
# logger.debug(u"PlexPy Monitor :: Checking for active streams.")
|
# logger.debug(u"PlexPy Monitor :: Checking for active streams.")
|
||||||
|
|
||||||
global int_ping_count
|
|
||||||
|
|
||||||
if session_list:
|
if session_list:
|
||||||
if int_ping_count >= 3:
|
|
||||||
logger.info(u"PlexPy Monitor :: The Plex Media Server is back up.")
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
|
|
||||||
|
|
||||||
int_ping_count = 0
|
|
||||||
|
|
||||||
media_container = session_list['sessions']
|
media_container = session_list['sessions']
|
||||||
|
|
||||||
# Check our temp table for what we must do with the new streams
|
# Check our temp table for what we must do with the new streams
|
||||||
|
@ -203,18 +195,6 @@ def check_active_sessions(ws_request=False):
|
||||||
else:
|
else:
|
||||||
logger.debug(u"PlexPy Monitor :: Unable to read session list.")
|
logger.debug(u"PlexPy Monitor :: Unable to read session list.")
|
||||||
|
|
||||||
if int_ping_count == 0:
|
|
||||||
# Temporarily set the stopped time for all sessions
|
|
||||||
stopped_time = int(time.time())
|
|
||||||
monitor_db.action('UPDATE sessions SET stopped = ?', [stopped_time])
|
|
||||||
|
|
||||||
int_ping_count += 1
|
|
||||||
logger.warn(u"PlexPy Monitor :: Unable to get an internal response from the server, ping attempt %s." \
|
|
||||||
% str(int_ping_count))
|
|
||||||
|
|
||||||
if int_ping_count == 3:
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
|
||||||
|
|
||||||
|
|
||||||
def check_recently_added():
|
def check_recently_added():
|
||||||
|
|
||||||
|
@ -289,6 +269,14 @@ def check_recently_added():
|
||||||
|
|
||||||
def check_server_response():
|
def check_server_response():
|
||||||
|
|
||||||
|
try:
|
||||||
|
web_socket.start_thread()
|
||||||
|
except:
|
||||||
|
logger.warn(u"Websocket :: Unable to open connection.")
|
||||||
|
|
||||||
|
|
||||||
|
def check_server_access():
|
||||||
|
|
||||||
with monitor_lock:
|
with monitor_lock:
|
||||||
pms_connect = pmsconnect.PmsConnect()
|
pms_connect = pmsconnect.PmsConnect()
|
||||||
server_response = pms_connect.get_server_response()
|
server_response = pms_connect.get_server_response()
|
||||||
|
|
|
@ -568,6 +568,8 @@ _CONFIG_DEFINITIONS = {
|
||||||
'UPDATE_NOTIFIERS_DB': (int, 'General', 1),
|
'UPDATE_NOTIFIERS_DB': (int, 'General', 1),
|
||||||
'VERIFY_SSL_CERT': (bool_int, 'Advanced', 1),
|
'VERIFY_SSL_CERT': (bool_int, 'Advanced', 1),
|
||||||
'VIDEO_LOGGING_ENABLE': (int, 'Monitoring', 1),
|
'VIDEO_LOGGING_ENABLE': (int, 'Monitoring', 1),
|
||||||
|
'WEBSOCKET_CONNECTION_ATTEMPTS': (int, 'Advanced', 5),
|
||||||
|
'WEBSOCKET_CONNECTION_TIMEOUT': (int, 'Advanced', 5),
|
||||||
'XBMC_ENABLED': (int, 'XBMC', 0),
|
'XBMC_ENABLED': (int, 'XBMC', 0),
|
||||||
'XBMC_HOST': (str, 'XBMC', ''),
|
'XBMC_HOST': (str, 'XBMC', ''),
|
||||||
'XBMC_PASSWORD': (str, 'XBMC', ''),
|
'XBMC_PASSWORD': (str, 'XBMC', ''),
|
||||||
|
|
|
@ -284,7 +284,7 @@ def import_from_plexivity(database=None, table_name=None, import_ignore_interval
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
plexpy.schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access',
|
plexpy.schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access',
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
|
|
||||||
ap = activity_processor.ActivityProcessor()
|
ap = activity_processor.ActivityProcessor()
|
||||||
|
|
|
@ -275,7 +275,7 @@ def import_from_plexwatch(database=None, table_name=None, import_ignore_interval
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
plexpy.schedule_job(activity_pinger.check_recently_added, 'Check for recently added items',
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
plexpy.schedule_job(activity_pinger.check_server_response, 'Check for Plex remote access',
|
plexpy.schedule_job(activity_pinger.check_server_access, 'Check for Plex remote access',
|
||||||
hours=0, minutes=0, seconds=0)
|
hours=0, minutes=0, seconds=0)
|
||||||
|
|
||||||
ap = activity_processor.ActivityProcessor()
|
ap = activity_processor.ActivityProcessor()
|
||||||
|
|
|
@ -66,19 +66,25 @@ def run():
|
||||||
ws_reconnect = False
|
ws_reconnect = False
|
||||||
reconnects = 0
|
reconnects = 0
|
||||||
|
|
||||||
# Try an open the websocket connection - if it fails after 15 retries fallback to polling
|
# Try an open the websocket connection
|
||||||
while not plexpy.WS_CONNECTED and reconnects <= 15:
|
while not plexpy.WS_CONNECTED and reconnects <= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||||
try:
|
try:
|
||||||
logger.info(u"PlexPy WebSocket :: Opening%s websocket, connection attempt %s." % (secure, str(reconnects + 1)))
|
logger.info(u"PlexPy WebSocket :: Opening%s websocket, connection attempt %s." % (secure, str(reconnects + 1)))
|
||||||
ws = create_connection(uri, header=header)
|
ws = create_connection(uri, header=header)
|
||||||
reconnects = 0
|
reconnects = 0
|
||||||
logger.info(u"PlexPy WebSocket :: Ready")
|
logger.info(u"PlexPy WebSocket :: Ready")
|
||||||
plexpy.WS_CONNECTED = True
|
plexpy.WS_CONNECTED = True
|
||||||
|
|
||||||
|
if not plexpy.PLEX_SERVER_UP:
|
||||||
|
logger.info(u"PlexPy WebSocket :: The Plex Media Server is back up.")
|
||||||
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
|
||||||
|
plexpy.PLEX_SERVER_UP = True
|
||||||
|
|
||||||
plexpy.initialize_scheduler()
|
plexpy.initialize_scheduler()
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.error(u"PlexPy WebSocket :: %s." % e)
|
logger.error(u"PlexPy WebSocket :: %s." % e)
|
||||||
reconnects += 1
|
reconnects += 1
|
||||||
time.sleep(5)
|
time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
|
||||||
|
|
||||||
while plexpy.WS_CONNECTED:
|
while plexpy.WS_CONNECTED:
|
||||||
try:
|
try:
|
||||||
|
@ -87,16 +93,17 @@ def run():
|
||||||
# successfully received data, reset reconnects counter
|
# successfully received data, reset reconnects counter
|
||||||
reconnects = 0
|
reconnects = 0
|
||||||
except (websocket.WebSocketConnectionClosedException, Exception):
|
except (websocket.WebSocketConnectionClosedException, Exception):
|
||||||
if reconnects <= 15:
|
if reconnects <= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||||
reconnects += 1
|
reconnects += 1
|
||||||
|
|
||||||
# Sleep 5 between connection attempts
|
# Sleep 5 between connection attempts
|
||||||
if reconnects > 1:
|
if reconnects > 1:
|
||||||
time.sleep(5)
|
time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
|
||||||
|
|
||||||
logger.warn(u"PlexPy WebSocket :: Connection has closed, reconnection attempt %s." % reconnects)
|
logger.warn(u"PlexPy WebSocket :: Connection has closed, reconnection attempt %s." % reconnects)
|
||||||
try:
|
try:
|
||||||
ws = create_connection(uri, header=header)
|
ws = create_connection(uri, header=header)
|
||||||
|
logger.info(u"PlexPy WebSocket :: Ready")
|
||||||
except IOError as e:
|
except IOError as e:
|
||||||
logger.info(u"PlexPy WebSocket :: %s." % e)
|
logger.info(u"PlexPy WebSocket :: %s." % e)
|
||||||
|
|
||||||
|
@ -113,8 +120,13 @@ def run():
|
||||||
start_thread()
|
start_thread()
|
||||||
|
|
||||||
if not plexpy.WS_CONNECTED and not ws_reconnect:
|
if not plexpy.WS_CONNECTED and not ws_reconnect:
|
||||||
logger.error(u"PlexPy WebSocket :: Connection unavailable, Plex server is down.")
|
logger.error(u"PlexPy WebSocket :: Connection unavailable.")
|
||||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
|
||||||
|
if plexpy.PLEX_SERVER_UP:
|
||||||
|
logger.info(u"PlexPy WebSocket :: Unable to get an internal response from the server, Plex server is down.")
|
||||||
|
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
||||||
|
plexpy.PLEX_SERVER_UP = False
|
||||||
|
|
||||||
plexpy.initialize_scheduler()
|
plexpy.initialize_scheduler()
|
||||||
|
|
||||||
logger.debug(u"PlexPy WebSocket :: Leaving thread.")
|
logger.debug(u"PlexPy WebSocket :: Leaving thread.")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue