From 9cd3758d15a43889bc6dac643c31069c31d31b01 Mon Sep 17 00:00:00 2001 From: tyler breese Date: Wed, 18 Oct 2023 01:04:24 -0400 Subject: [PATCH] multiple websocket connections --- data/interfaces/default/index.html | 6 +- plexpy/__init__.py | 11 +- plexpy/activity_handler.py | 19 +- plexpy/activity_pinger.py | 4 +- plexpy/activity_processor.py | 19 +- plexpy/plextv.py | 34 +- plexpy/pmsconnect.py | 25 +- plexpy/server_manager.py | 4 +- plexpy/web_socket.py | 514 ++++++++++++++++------------- plexpy/webserve.py | 4 +- 10 files changed, 341 insertions(+), 299 deletions(-) diff --git a/data/interfaces/default/index.html b/data/interfaces/default/index.html index 4ec4560b..6c7d1606 100644 --- a/data/interfaces/default/index.html +++ b/data/interfaces/default/index.html @@ -5,7 +5,7 @@ <%def name="body()"> -<% from plexpy import PLEX_SERVER_UP %> +<% from plexpy import web_socket %>
% for section in config['home_sections']: % if section == 'current_activity': @@ -23,7 +23,7 @@
- % if PLEX_SERVER_UP: + % if web_socket.isServerUp():
  Checking for activity...
% elif config['pms_is_cloud']:
Plex Cloud server is sleeping.
@@ -124,7 +124,7 @@
- % if PLEX_SERVER_UP: + % if web_socket.isServerUp():
  Looking for new items...
% elif config['pms_is_cloud']:
Plex Cloud server is sleeping.
diff --git a/plexpy/__init__.py b/plexpy/__init__.py index b6e763ed..c5681b6f 100644 --- a/plexpy/__init__.py +++ b/plexpy/__init__.py @@ -142,11 +142,6 @@ AUTH_ENABLED = None DEV = False -WEBSOCKET = None -WS_CONNECTED = False -PLEX_SERVER_UP = None -PLEX_REMOTE_ACCESS_UP = None - TRACKER = None WIN_SYS_TRAY_ICON = None @@ -472,7 +467,7 @@ def initialize_scheduler(): schedule_job(config.make_backup, 'Backup Tautulli config', hours=backup_hours, minutes=0, seconds=0, args=(True, True)) - if WS_CONNECTED and CONFIG.PMS_IP and CONFIG.PMS_TOKEN: + if web_socket.isServerUp() and CONFIG.PMS_IP and CONFIG.PMS_TOKEN: schedule_job(plextv.get_server_resources, 'Refresh Plex server URLs', hours=12 * (not bool(CONFIG.PMS_URL_MANUAL)), minutes=0, seconds=0) @@ -490,8 +485,8 @@ def initialize_scheduler(): schedule_job(activity_pinger.connect_server, 'Check for server response', hours=0, minutes=0, seconds=0) - schedule_job(web_socket.send_ping, 'Websocket ping', - hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG)) + # schedule_job(web_socket.send_ping, 'Websocket ping', + # hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG)) else: # Cancel all jobs diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index 532d3c50..f5d4a3c3 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -50,7 +50,7 @@ RECENTLY_ADDED_QUEUE = {} class ActivityHandler(object): - def __init__(self, timeline): + def __init__(self, timeline, server_id): self.ap = activity_processor.ActivityProcessor() self.timeline = timeline @@ -70,19 +70,20 @@ class ActivityHandler(object): self.db_session = None self.session = None self.metadata = None + self.server_id = server_id def get_db_session(self): # Retrieve the session data from our temp table - self.db_session = self.ap.get_session_by_key(session_key=self.session_key) + self.db_session = self.ap.get_session_by_key(session_key=self.session_key, server_id=self.server_id) def get_metadata(self, skip_cache=False): if self.metadata is None: cache_key = None if skip_cache else self.session_key - for pms_connect in server_manager.ServerManger().get_server_list(): - metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key) + pms_connect = server_manager.ServerManger().get_server(server_id=self.server_id) + metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key) - if metadata: - self.metadata = metadata + if metadata: + self.metadata = metadata def get_live_session(self, skip_cache=False): for pms_connect in server_manager.ServerManger().get_server_list(): @@ -439,7 +440,7 @@ class ActivityHandler(object): class TimelineHandler(object): - def __init__(self, timeline): + def __init__(self, timeline, server_id=None): self.timeline = timeline self.rating_key = None @@ -458,6 +459,7 @@ class TimelineHandler(object): self.metadata_state = self.timeline.get('metadataState') self.media_state = self.timeline.get('mediaState') self.queue_size = self.timeline.get('queueSize') + self.server_id = server_id # This function receives events from our websocket connection def process(self): @@ -546,10 +548,11 @@ class TimelineHandler(object): class ReachabilityHandler(object): - def __init__(self, data): + def __init__(self, data, server_id=None): self.data = data self.is_reachable = self.data.get('reachable', False) + self.server_id = server_id def remote_access_enabled(self): for pms_connect in server_manager.ServerManger().get_server_list(): diff --git a/plexpy/activity_pinger.py b/plexpy/activity_pinger.py index 4cb3f601..5f4b6d6d 100644 --- a/plexpy/activity_pinger.py +++ b/plexpy/activity_pinger.py @@ -252,8 +252,8 @@ def connect_server(log=True, startup=False): logger.info("Tautulli Monitor :: Attempting to reconnect Plex server...") try: - web_socket.start_thread() - except Exception as e: + web_socket.start_threads() + except Exception as e: logger.error("Websocket :: Unable to open connection: %s." % e) diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index 1d5394b3..69f05132 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -193,7 +193,7 @@ class ActivityProcessor(object): user_details = user_data.get_details(user_id=session['user_id']) library_data = libraries.Libraries() - library_details = library_data.get_details(section_id=section_id) + library_details = library_data.get_details(section_id=section_id, server_id=session['server_id']) # Return false if failed to retrieve user or library details if not user_details or not library_details: @@ -322,7 +322,8 @@ class ActivityProcessor(object): 'view_offset': session['view_offset'], 'section_id': metadata['section_id'], 'secure': session['secure'], - 'relayed': session['relayed'] + 'relayed': session['relayed'], + 'server_id': session['server_id'] } # logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history transaction..." @@ -575,21 +576,21 @@ class ActivityProcessor(object): sessions = self.db.select(query, args) return sessions - def get_session_by_key(self, session_key=None): + def get_session_by_key(self, session_key=None, server_id=None): if str(session_key).isdigit(): session = self.db.select_single("SELECT * FROM sessions " - "WHERE session_key = ? ", - args=[session_key]) + "WHERE session_key = ? AND server_id = ? ", + args=[session_key, server_id]) if session: return session return None - def get_session_by_id(self, session_id=None): - if session_id: + def get_session_by_id(self, session_id=None, server_id=None): + if session_id and server_id: session = self.db.select_single("SELECT * FROM sessions " - "WHERE session_id = ? ", - args=[session_id]) + "WHERE session_id = ? AND server_id = ? ", + args=[session_id, server_id]) if session: return session diff --git a/plexpy/plextv.py b/plexpy/plextv.py index 8bab1d51..35a2089a 100644 --- a/plexpy/plextv.py +++ b/plexpy/plextv.py @@ -44,7 +44,7 @@ else: from plexpy.plex import Plex -def get_server_resources(return_presence=False, return_server=False, return_info=False, **kwargs): +def get_server_resources(return_presence=False, return_server=False, return_info=False, return_servers=False, **kwargs): if not return_presence and not return_info: logger.info("Tautulli PlexTV :: Requesting resources for server...") @@ -75,11 +75,18 @@ def get_server_resources(return_presence=False, return_server=False, return_info else: scheme = 'http' + plex_tv = PlexTV() + + if return_servers: + return plex_tv.get_servers_connections(pms_identifier=server['pms_identifier'], + pms_ip=server['pms_ip'], + pms_port=server['pms_port'], + include_https=server['pms_ssl']) + fallback_url = '{scheme}://{hostname}:{port}'.format(scheme=scheme, hostname=server['pms_ip'], port=server['pms_port']) - plex_tv = PlexTV() result = plex_tv.get_server_connections(pms_identifier=server['pms_identifier'], pms_ip=server['pms_ip'], pms_port=server['pms_port'], @@ -591,6 +598,14 @@ class PlexTV(object): return response.ok def get_server_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True): + servers = self.get_servers_connections(pms_identifier, pms_ip, pms_port, include_https) + for server in servers: + if server['pms_identifier'] == pms_identifier: + return server + return None + + + def get_servers_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True): if not pms_identifier: logger.error("Tautulli PlexTV :: Unable to retrieve server connections: no pms_identifier provided.") @@ -629,16 +644,14 @@ class PlexTV(object): server['connections'] = conn return server - server = {} + servers = [] # Try to match the device for a in xml_head: - if helpers.get_xml_attr(a, 'clientIdentifier') == pms_identifier: - server = get_connections(a) - break + servers.append(get_connections(a)) # Else no device match found - if not server: + if not len(servers): # Try to match the PMS_IP and PMS_PORT for a in xml_head: if helpers.get_xml_attr(a, 'provides') == 'server': @@ -647,13 +660,10 @@ class PlexTV(object): for connection in connections: if helpers.get_xml_attr(connection, 'address') == pms_ip and \ helpers.get_xml_attr(connection, 'port') == str(pms_port): - server = get_connections(a) + servers.append(get_connections(a)) break - if server.get('connections'): - break - - return server + return servers def get_server_times(self): servers = self.get_plextv_server_list(output_format='xml') diff --git a/plexpy/pmsconnect.py b/plexpy/pmsconnect.py index 6ff3a968..d440a4e7 100644 --- a/plexpy/pmsconnect.py +++ b/plexpy/pmsconnect.py @@ -74,7 +74,7 @@ class PmsConnect(object): Retrieve data from Plex Server """ - def __init__(self, url=None, token=None): + def __init__(self, server_id=None, url=None, token=None): self.url = url self.token = token @@ -85,6 +85,8 @@ class PmsConnect(object): port=plexpy.CONFIG.PMS_PORT) self.timeout = plexpy.CONFIG.PMS_TIMEOUT + self.server_id = server_id + if not self.token: # Check if we should use the admin token, or the guest server token if session.get_session_user_id(): @@ -116,7 +118,7 @@ class PmsConnect(object): return request - def get_sessions_terminate(self, session_id='', reason=''): + def get_sessions_terminate(self, session_id='', reason='', server_id=None): """ Return current sessions. @@ -2330,7 +2332,7 @@ class PmsConnect(object): return session_output - def terminate_session(self, session_key='', session_id='', message=''): + def terminate_session(self, session_key='', session_id='', message='', server_id=None): """ Terminates a streaming session. """ @@ -2345,26 +2347,19 @@ class PmsConnect(object): ap = activity_processor.ActivityProcessor() if session_key: - session = ap.get_session_by_key(session_key=session_key) + session = ap.get_session_by_key(session_key=session_key, server_id=server_id) if session and not session_id: session_id = session['session_id'] - elif session_id: - session = ap.get_session_by_id(session_id=session_id) - if session and not session_key: - session_key = session['session_key'] - - else: - session = session_key = session_id = None - - if not session: - msg = 'Invalid session_key (%s) or session_id (%s)' % (session_key, session_id) + # you only need session_id to terminate + if session_key and not session_id: + msg = 'Invalid session_key (%s)' % (session_key) logger.warn("Tautulli Pmsconnect :: Failed to terminate session: %s." % msg) return msg if session_id: logger.info("Tautulli Pmsconnect :: Terminating session %s (session_id %s)." % (session_key, session_id)) - response = self.get_sessions_terminate(session_id=session_id, reason=message) + response = self.get_sessions_terminate(session_id=session_id, reason=message, server_id=server_id) return response.ok else: msg = 'Missing session_id' diff --git a/plexpy/server_manager.py b/plexpy/server_manager.py index d1de054e..d39448d7 100644 --- a/plexpy/server_manager.py +++ b/plexpy/server_manager.py @@ -32,7 +32,7 @@ class ServerManger(object): pmsServers = [] for server in pmsconnect.PmsConnect().get_servers_info(): url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"]) - pmsServers.append(pmsconnect.PmsConnect(url=url)) + pmsServers.append(pmsconnect.PmsConnect(server['machine_identifier'], url=url)) return pmsServers def get_server(self, server_id): @@ -40,5 +40,5 @@ class ServerManger(object): for server in pmsconnect.PmsConnect().get_servers_info(): if server['machine_identifier'] == server_id: url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"]) - return pmsconnect.PmsConnect(url=url) + return pmsconnect.PmsConnect(server_id, url=url) return None \ No newline at end of file diff --git a/plexpy/web_socket.py b/plexpy/web_socket.py index 830ab590..0a609e98 100644 --- a/plexpy/web_socket.py +++ b/plexpy/web_socket.py @@ -27,6 +27,7 @@ import time import certifi import websocket +from websocket import create_connection import plexpy if plexpy.PYTHON2: @@ -35,6 +36,7 @@ if plexpy.PYTHON2: import activity_processor import database import logger + import plextv import server_manager else: from plexpy import activity_handler @@ -42,6 +44,7 @@ else: from plexpy import activity_processor from plexpy import database from plexpy import logger + from plexpy import plextv from plexpy import server_manager @@ -52,7 +55,10 @@ pong_timer = None pong_count = 0 -def start_thread(): +def isServerUp(): + return True + +def start_threads(): try: # Check for any existing sessions on start up activity_pinger.check_active_sessions(ws_request=True) @@ -61,270 +67,302 @@ def start_thread(): logger.warn("Tautulli WebSocket :: Attempt to fix by flushing temporary sessions...") database.delete_sessions() - # Start the websocket listener on it's own thread - thread = threading.Thread(target=run) - thread.daemon = True - thread.start() + plex_servers = plextv.get_server_resources(return_servers=True) + + owned_servers = server_manager.ServerManger().get_server_list() -def on_connect(): - if plexpy.PLEX_SERVER_UP is None: - plexpy.PLEX_SERVER_UP = True - - if not plexpy.PLEX_SERVER_UP: - logger.info("Tautulli WebSocket :: The Plex Media Server is back up.") - plexpy.PLEX_SERVER_UP = True - - if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'): - logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.") - activity_handler.schedule_callback('on_intdown', remove_job=True) - else: - on_intup() - - plexpy.initialize_scheduler() - if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG: - send_ping() - - -def on_disconnect(): - if plexpy.PLEX_SERVER_UP is None: - plexpy.PLEX_SERVER_UP = False - - if plexpy.PLEX_SERVER_UP: - logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.") - plexpy.PLEX_SERVER_UP = False - - logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.", - plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD) - activity_handler.schedule_callback('on_intdown', func=on_intdown, - seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD) - - activity_processor.ActivityProcessor().set_temp_stopped() - plexpy.initialize_scheduler() - - -def on_intdown(): - plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) - - -def on_intup(): - plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'}) - - -def reconnect(): - close() - logger.info("Tautulli WebSocket :: Reconnecting websocket...") - start_thread() - - -def shutdown(): - global ws_shutdown - ws_shutdown = True - close() - - -def close(): - logger.info("Tautulli WebSocket :: Disconnecting websocket...") - plexpy.WEBSOCKET.close() - plexpy.WS_CONNECTED = False - - -def send_ping(): - if plexpy.WS_CONNECTED: - # logger.debug("Tautulli WebSocket :: Sending ping.") - plexpy.WEBSOCKET.ping("Hi?") - - global pong_timer - pong_timer = threading.Timer(5.0, wait_pong) - pong_timer.daemon = True - pong_timer.start() - - -def wait_pong(): - global pong_count - pong_count += 1 - - logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count)) - - if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: - pong_count = 0 - close() - - -def receive_pong(): - # logger.debug("Tautulli WebSocket :: Received pong.") - global pong_timer - global pong_count - if pong_timer: - pong_timer = pong_timer.cancel() - pong_count = 0 - - -def run(): - from websocket import create_connection - - if plexpy.CONFIG.PMS_SSL and plexpy.CONFIG.PMS_URL[:5] == 'https': - uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications' - secure = 'secure ' - if plexpy.CONFIG.VERIFY_SSL_CERT: - sslopt = {'ca_certs': certifi.where()} - else: - sslopt = {'cert_reqs': ssl.CERT_NONE} - else: - uri = 'ws://%s:%s/:/websockets/notifications' % ( - plexpy.CONFIG.PMS_IP, - plexpy.CONFIG.PMS_PORT - ) - secure = '' - sslopt = None - - # Set authentication token (if one is available) - if plexpy.CONFIG.PMS_TOKEN: - header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN} - else: - header = None - - timeout = plexpy.CONFIG.PMS_TIMEOUT - - global ws_shutdown - ws_shutdown = False - reconnects = 0 - - # Try an open the websocket connection - logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure) - try: - plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt) - logger.info("Tautulli WebSocket :: Ready") - plexpy.WS_CONNECTED = True - except (websocket.WebSocketException, IOError, Exception) as e: - logger.error("Tautulli WebSocket :: %s.", e) - - if plexpy.WS_CONNECTED: - on_connect() - - while plexpy.WS_CONNECTED: - try: - process(*receive(plexpy.WEBSOCKET)) - - # successfully received data, reset reconnects counter - reconnects = 0 - - except websocket.WebSocketConnectionClosedException: - if ws_shutdown: + # Start each websocket listener on it's own thread per server + for owned_server in owned_servers: + for server in plex_servers: + if owned_server.server_id == server['pms_identifier']: + for connection in server['connections']: + if connection['local']: + wss=WebSocketServer(connection, owned_server.server_id) + thread = threading.Thread(target=wss.run) + thread.daemon = True + thread.start() + break break - if reconnects == 0: - logger.warn("Tautulli WebSocket :: Connection has closed.") - if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: - reconnects += 1 +class WebSocketServer(object): + def __init__(self, server, server_id): + self.server=server + self.WEBSOCKET = None + self.WS_CONNECTED = False + self.PLEX_SERVER_UP = None + self.PLEX_REMOTE_ACCESS_UP = None + self.server_id = server_id - # Sleep 5 between connection attempts - if reconnects > 1: - time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT) + def on_connect(self): + if self.PLEX_SERVER_UP is None: + self.PLEX_SERVER_UP = True - logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects)) - - try: - plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt) - logger.info("Tautulli WebSocket :: Ready") - plexpy.WS_CONNECTED = True - except (websocket.WebSocketException, IOError, Exception) as e: - logger.error("Tautulli WebSocket :: %s.", e) + if not self.PLEX_SERVER_UP: + logger.info("Tautulli WebSocket :: The Plex Media Server is back up.") + self.PLEX_SERVER_UP = True + if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'): + logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.") + activity_handler.schedule_callback('on_intdown', remove_job=True) else: - close() - break + self.on_intup() - except (websocket.WebSocketException, Exception) as e: - if ws_shutdown: - break + plexpy.initialize_scheduler() + if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG: + self.send_ping() + + def on_disconnect(self): + if self.PLEX_SERVER_UP is None: + self.PLEX_SERVER_UP = False + + if self.PLEX_SERVER_UP: + logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.") + self.PLEX_SERVER_UP = False + + logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.", + plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD) + activity_handler.schedule_callback('on_intdown', func=self.on_intdown, + seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD) + + activity_processor.ActivityProcessor().set_temp_stopped() + plexpy.initialize_scheduler() + + + def on_intdown(self): + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'}) + + + def on_intup(self): + plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'}) + + + def reconnect(self): + self.close() + logger.info("Tautulli WebSocket :: Reconnecting websocket...") + self.run() + + + def shutdown(self): + global ws_shutdown + ws_shutdown = True + self.close() + + + def close(self): + logger.info("Tautulli WebSocket :: Disconnecting websocket...") + self.WEBSOCKET.close() + self.WS_CONNECTED = False + + + def send_ping(self): + if self.WS_CONNECTED: + # logger.debug("Tautulli WebSocket :: Sending ping.") + self.WEBSOCKET.ping("Hi?") + + global pong_timer + pong_timer = threading.Timer(5.0, self.wait_pong) + pong_timer.daemon = True + pong_timer.start() + + + def wait_pong(self): + global pong_count + pong_count += 1 + + logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count)) + + if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: + pong_count = 0 + self.close() + + + def receive_pong(self): + # logger.debug("Tautulli WebSocket :: Received pong.") + global pong_timer + global pong_count + if pong_timer: + pong_timer = pong_timer.cancel() + pong_count = 0 + + + def run(self): + + if plexpy.CONFIG.PMS_SSL: + uri = "" + if self.server: + uri = self.server['uri'].replace('https://', 'wss://') + '/:/websockets/notifications' + else: + uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications' + secure = 'secure ' + if plexpy.CONFIG.VERIFY_SSL_CERT: + sslopt = {'ca_certs': certifi.where()} + else: + sslopt = {'cert_reqs': ssl.CERT_NONE} + else: + uri = "" + if self.server: + uri = 'ws://%s:%s/:/websockets/notifications' % ( + self.server['address'], + self.server['port'] + ) + else: + uri = 'ws://%s:%s/:/websockets/notifications' % ( + plexpy.CONFIG.PMS_IP, + plexpy.CONFIG.PMS_PORT + ) + secure = '' + sslopt = None + + # Set authentication token (if one is available) + if plexpy.CONFIG.PMS_TOKEN: + header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN} + else: + header = None + + timeout = plexpy.CONFIG.PMS_TIMEOUT + + global ws_shutdown + ws_shutdown = False + reconnects = 0 + + # Try an open the websocket connection + logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure) + try: + self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt) + logger.info("Tautulli WebSocket :: Ready") + self.WS_CONNECTED = True + except (websocket.WebSocketException, IOError, Exception) as e: logger.error("Tautulli WebSocket :: %s.", e) - close() - break - if not plexpy.WS_CONNECTED and not ws_shutdown: - on_disconnect() + if self.WS_CONNECTED: + self.on_connect() - logger.debug("Tautulli WebSocket :: Leaving thread.") + while self.WS_CONNECTED: + try: + self.process(*self.receive(self.WEBSOCKET)) + + # successfully received data, reset reconnects counter + reconnects = 0 + + except websocket.WebSocketConnectionClosedException: + if ws_shutdown: + break + + if reconnects == 0: + logger.warn("Tautulli WebSocket :: Connection has closed.") + + if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS: + reconnects += 1 + + # Sleep 5 between connection attempts + if reconnects > 1: + time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT) + + logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects)) + + try: + self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt) + logger.info("Tautulli WebSocket :: Ready") + self.WS_CONNECTED = True + except (websocket.WebSocketException, IOError, Exception) as e: + logger.error("Tautulli WebSocket :: %s.", e) + + else: + self.close() + break + + except (websocket.WebSocketException, Exception) as e: + if ws_shutdown: + break + + logger.error("Tautulli WebSocket :: %s.", e) + self.close() + break + + if not self.WS_CONNECTED and not ws_shutdown: + self.on_disconnect() + + logger.debug("Tautulli WebSocket :: Leaving thread.") -def receive(ws): - frame = ws.recv_frame() + def receive(self, ws): + frame = ws.recv_frame() - if not frame: - raise websocket.WebSocketException("Not a valid frame %s" % frame) - elif frame.opcode in opcode_data: - return frame.opcode, frame.data - elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: - ws.send_close() - return frame.opcode, None - elif frame.opcode == websocket.ABNF.OPCODE_PING: - # logger.debug("Tautulli WebSocket :: Received ping, sending pong.") - ws.pong("Hi!") - elif frame.opcode == websocket.ABNF.OPCODE_PONG: - receive_pong() + if not frame: + raise websocket.WebSocketException("Not a valid frame %s" % frame) + elif frame.opcode in opcode_data: + return frame.opcode, frame.data + elif frame.opcode == websocket.ABNF.OPCODE_CLOSE: + ws.send_close() + return frame.opcode, None + elif frame.opcode == websocket.ABNF.OPCODE_PING: + # logger.debug("Tautulli WebSocket :: Received ping, sending pong.") + ws.pong("Hi!") + elif frame.opcode == websocket.ABNF.OPCODE_PONG: + self.receive_pong() - return None, None + return None, None -def process(opcode, data): - if opcode not in opcode_data: - return False - - try: - data = data.decode('utf-8') - logger.websocket_debug(data) - event = json.loads(data) - except Exception as e: - logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e) - logger.websocket_error(data) - return False - - event = event.get('NotificationContainer', event) - event_type = event.get('type') - - if not event_type: - return False - - if event_type == 'playing': - event_data = event.get('PlaySessionStateNotification', event.get('_children', {})) - - if not event_data: - logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.") + def process(self, opcode, data): + if opcode not in opcode_data: return False try: - activity = activity_handler.ActivityHandler(timeline=event_data[0]) - activity.process() + data = data.decode('utf-8') + logger.websocket_debug(data) + event = json.loads(data) except Exception as e: - logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e) - - if event_type == 'timeline': - event_data = event.get('TimelineEntry', event.get('_children', {})) - - if not event_data: - logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.") + logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e) + logger.websocket_error(data) return False - try: - activity = activity_handler.TimelineHandler(timeline=event_data[0]) - activity.process() - except Exception as e: - logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e) + event = event.get('NotificationContainer', event) + event_type = event.get('type') - if event_type == 'reachability': - event_data = event.get('ReachabilityNotification', event.get('_children', {})) - - if not event_data: - logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.") + if not event_type: return False - try: - activity = activity_handler.ReachabilityHandler(data=event_data[0]) - activity.process() - except Exception as e: - logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e) + if event_type == 'playing': + event_data = event.get('PlaySessionStateNotification', event.get('_children', {})) - return True + if not event_data: + logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.") + return False + + try: + activity = activity_handler.ActivityHandler(timeline=event_data[0], server_id=self.server_id) + activity.process() + except Exception as e: + logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e) + + if event_type == 'timeline': + event_data = event.get('TimelineEntry', event.get('_children', {})) + + if not event_data: + logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.") + return False + + try: + activity = activity_handler.TimelineHandler(timeline=event_data[0], server_id=self.server_id) + activity.process() + except Exception as e: + logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e) + + if event_type == 'reachability': + event_data = event.get('ReachabilityNotification', event.get('_children', {})) + + if not event_data: + logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.") + return False + + try: + activity = activity_handler.ReachabilityHandler(data=event_data[0], server_id=self.server_id) + activity.process() + except Exception as e: + logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e) + + return True diff --git a/plexpy/webserve.py b/plexpy/webserve.py index 044ac5a6..b80807ae 100644 --- a/plexpy/webserve.py +++ b/plexpy/webserve.py @@ -393,7 +393,7 @@ class WebInterface(object): ``` """ pms_connect = server_manager.ServerManger().get_server(server_id=server_id) - result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message) + result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message, server_id=server_id) if isinstance(result, str): return {'result': 'error', 'message': 'Failed to terminate session: {}.'.format(result)} @@ -6962,7 +6962,7 @@ class WebInterface(object): ``` """ cherrypy.response.headers['Cache-Control'] = "max-age=0,no-cache,no-store" - status = {'result': 'success', 'connected': plexpy.PLEX_SERVER_UP} + status = {'result': 'success', 'connected': web_socket.isServerUp()} return status