- % if PLEX_SERVER_UP:
+ % if web_socket.isServerUp():
Looking for new items...
% elif config['pms_is_cloud']:
Plex Cloud server is sleeping.
diff --git a/plexpy/__init__.py b/plexpy/__init__.py
index b6e763ed..c5681b6f 100644
--- a/plexpy/__init__.py
+++ b/plexpy/__init__.py
@@ -142,11 +142,6 @@ AUTH_ENABLED = None
DEV = False
-WEBSOCKET = None
-WS_CONNECTED = False
-PLEX_SERVER_UP = None
-PLEX_REMOTE_ACCESS_UP = None
-
TRACKER = None
WIN_SYS_TRAY_ICON = None
@@ -472,7 +467,7 @@ def initialize_scheduler():
schedule_job(config.make_backup, 'Backup Tautulli config',
hours=backup_hours, minutes=0, seconds=0, args=(True, True))
- if WS_CONNECTED and CONFIG.PMS_IP and CONFIG.PMS_TOKEN:
+ if web_socket.isServerUp() and CONFIG.PMS_IP and CONFIG.PMS_TOKEN:
schedule_job(plextv.get_server_resources, 'Refresh Plex server URLs',
hours=12 * (not bool(CONFIG.PMS_URL_MANUAL)), minutes=0, seconds=0)
@@ -490,8 +485,8 @@ def initialize_scheduler():
schedule_job(activity_pinger.connect_server, 'Check for server response',
hours=0, minutes=0, seconds=0)
- schedule_job(web_socket.send_ping, 'Websocket ping',
- hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG))
+ # schedule_job(web_socket.send_ping, 'Websocket ping',
+ # hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG))
else:
# Cancel all jobs
diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py
index 532d3c50..f5d4a3c3 100644
--- a/plexpy/activity_handler.py
+++ b/plexpy/activity_handler.py
@@ -50,7 +50,7 @@ RECENTLY_ADDED_QUEUE = {}
class ActivityHandler(object):
- def __init__(self, timeline):
+ def __init__(self, timeline, server_id):
self.ap = activity_processor.ActivityProcessor()
self.timeline = timeline
@@ -70,19 +70,20 @@ class ActivityHandler(object):
self.db_session = None
self.session = None
self.metadata = None
+ self.server_id = server_id
def get_db_session(self):
# Retrieve the session data from our temp table
- self.db_session = self.ap.get_session_by_key(session_key=self.session_key)
+ self.db_session = self.ap.get_session_by_key(session_key=self.session_key, server_id=self.server_id)
def get_metadata(self, skip_cache=False):
if self.metadata is None:
cache_key = None if skip_cache else self.session_key
- for pms_connect in server_manager.ServerManger().get_server_list():
- metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key)
+ pms_connect = server_manager.ServerManger().get_server(server_id=self.server_id)
+ metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key)
- if metadata:
- self.metadata = metadata
+ if metadata:
+ self.metadata = metadata
def get_live_session(self, skip_cache=False):
for pms_connect in server_manager.ServerManger().get_server_list():
@@ -439,7 +440,7 @@ class ActivityHandler(object):
class TimelineHandler(object):
- def __init__(self, timeline):
+ def __init__(self, timeline, server_id=None):
self.timeline = timeline
self.rating_key = None
@@ -458,6 +459,7 @@ class TimelineHandler(object):
self.metadata_state = self.timeline.get('metadataState')
self.media_state = self.timeline.get('mediaState')
self.queue_size = self.timeline.get('queueSize')
+ self.server_id = server_id
# This function receives events from our websocket connection
def process(self):
@@ -546,10 +548,11 @@ class TimelineHandler(object):
class ReachabilityHandler(object):
- def __init__(self, data):
+ def __init__(self, data, server_id=None):
self.data = data
self.is_reachable = self.data.get('reachable', False)
+ self.server_id = server_id
def remote_access_enabled(self):
for pms_connect in server_manager.ServerManger().get_server_list():
diff --git a/plexpy/activity_pinger.py b/plexpy/activity_pinger.py
index 4cb3f601..5f4b6d6d 100644
--- a/plexpy/activity_pinger.py
+++ b/plexpy/activity_pinger.py
@@ -252,8 +252,8 @@ def connect_server(log=True, startup=False):
logger.info("Tautulli Monitor :: Attempting to reconnect Plex server...")
try:
- web_socket.start_thread()
- except Exception as e:
+ web_socket.start_threads()
+ except Exception as e:
logger.error("Websocket :: Unable to open connection: %s." % e)
diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py
index 1d5394b3..69f05132 100644
--- a/plexpy/activity_processor.py
+++ b/plexpy/activity_processor.py
@@ -193,7 +193,7 @@ class ActivityProcessor(object):
user_details = user_data.get_details(user_id=session['user_id'])
library_data = libraries.Libraries()
- library_details = library_data.get_details(section_id=section_id)
+ library_details = library_data.get_details(section_id=section_id, server_id=session['server_id'])
# Return false if failed to retrieve user or library details
if not user_details or not library_details:
@@ -322,7 +322,8 @@ class ActivityProcessor(object):
'view_offset': session['view_offset'],
'section_id': metadata['section_id'],
'secure': session['secure'],
- 'relayed': session['relayed']
+ 'relayed': session['relayed'],
+ 'server_id': session['server_id']
}
# logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history transaction..."
@@ -575,21 +576,21 @@ class ActivityProcessor(object):
sessions = self.db.select(query, args)
return sessions
- def get_session_by_key(self, session_key=None):
+ def get_session_by_key(self, session_key=None, server_id=None):
if str(session_key).isdigit():
session = self.db.select_single("SELECT * FROM sessions "
- "WHERE session_key = ? ",
- args=[session_key])
+ "WHERE session_key = ? AND server_id = ? ",
+ args=[session_key, server_id])
if session:
return session
return None
- def get_session_by_id(self, session_id=None):
- if session_id:
+ def get_session_by_id(self, session_id=None, server_id=None):
+ if session_id and server_id:
session = self.db.select_single("SELECT * FROM sessions "
- "WHERE session_id = ? ",
- args=[session_id])
+ "WHERE session_id = ? AND server_id = ? ",
+ args=[session_id, server_id])
if session:
return session
diff --git a/plexpy/plextv.py b/plexpy/plextv.py
index 8bab1d51..35a2089a 100644
--- a/plexpy/plextv.py
+++ b/plexpy/plextv.py
@@ -44,7 +44,7 @@ else:
from plexpy.plex import Plex
-def get_server_resources(return_presence=False, return_server=False, return_info=False, **kwargs):
+def get_server_resources(return_presence=False, return_server=False, return_info=False, return_servers=False, **kwargs):
if not return_presence and not return_info:
logger.info("Tautulli PlexTV :: Requesting resources for server...")
@@ -75,11 +75,18 @@ def get_server_resources(return_presence=False, return_server=False, return_info
else:
scheme = 'http'
+ plex_tv = PlexTV()
+
+ if return_servers:
+ return plex_tv.get_servers_connections(pms_identifier=server['pms_identifier'],
+ pms_ip=server['pms_ip'],
+ pms_port=server['pms_port'],
+ include_https=server['pms_ssl'])
+
fallback_url = '{scheme}://{hostname}:{port}'.format(scheme=scheme,
hostname=server['pms_ip'],
port=server['pms_port'])
- plex_tv = PlexTV()
result = plex_tv.get_server_connections(pms_identifier=server['pms_identifier'],
pms_ip=server['pms_ip'],
pms_port=server['pms_port'],
@@ -591,6 +598,14 @@ class PlexTV(object):
return response.ok
def get_server_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True):
+ servers = self.get_servers_connections(pms_identifier, pms_ip, pms_port, include_https)
+ for server in servers:
+ if server['pms_identifier'] == pms_identifier:
+ return server
+ return None
+
+
+ def get_servers_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True):
if not pms_identifier:
logger.error("Tautulli PlexTV :: Unable to retrieve server connections: no pms_identifier provided.")
@@ -629,16 +644,14 @@ class PlexTV(object):
server['connections'] = conn
return server
- server = {}
+ servers = []
# Try to match the device
for a in xml_head:
- if helpers.get_xml_attr(a, 'clientIdentifier') == pms_identifier:
- server = get_connections(a)
- break
+ servers.append(get_connections(a))
# Else no device match found
- if not server:
+ if not len(servers):
# Try to match the PMS_IP and PMS_PORT
for a in xml_head:
if helpers.get_xml_attr(a, 'provides') == 'server':
@@ -647,13 +660,10 @@ class PlexTV(object):
for connection in connections:
if helpers.get_xml_attr(connection, 'address') == pms_ip and \
helpers.get_xml_attr(connection, 'port') == str(pms_port):
- server = get_connections(a)
+ servers.append(get_connections(a))
break
- if server.get('connections'):
- break
-
- return server
+ return servers
def get_server_times(self):
servers = self.get_plextv_server_list(output_format='xml')
diff --git a/plexpy/pmsconnect.py b/plexpy/pmsconnect.py
index 6ff3a968..d440a4e7 100644
--- a/plexpy/pmsconnect.py
+++ b/plexpy/pmsconnect.py
@@ -74,7 +74,7 @@ class PmsConnect(object):
Retrieve data from Plex Server
"""
- def __init__(self, url=None, token=None):
+ def __init__(self, server_id=None, url=None, token=None):
self.url = url
self.token = token
@@ -85,6 +85,8 @@ class PmsConnect(object):
port=plexpy.CONFIG.PMS_PORT)
self.timeout = plexpy.CONFIG.PMS_TIMEOUT
+ self.server_id = server_id
+
if not self.token:
# Check if we should use the admin token, or the guest server token
if session.get_session_user_id():
@@ -116,7 +118,7 @@ class PmsConnect(object):
return request
- def get_sessions_terminate(self, session_id='', reason=''):
+ def get_sessions_terminate(self, session_id='', reason='', server_id=None):
"""
Return current sessions.
@@ -2330,7 +2332,7 @@ class PmsConnect(object):
return session_output
- def terminate_session(self, session_key='', session_id='', message=''):
+ def terminate_session(self, session_key='', session_id='', message='', server_id=None):
"""
Terminates a streaming session.
"""
@@ -2345,26 +2347,19 @@ class PmsConnect(object):
ap = activity_processor.ActivityProcessor()
if session_key:
- session = ap.get_session_by_key(session_key=session_key)
+ session = ap.get_session_by_key(session_key=session_key, server_id=server_id)
if session and not session_id:
session_id = session['session_id']
- elif session_id:
- session = ap.get_session_by_id(session_id=session_id)
- if session and not session_key:
- session_key = session['session_key']
-
- else:
- session = session_key = session_id = None
-
- if not session:
- msg = 'Invalid session_key (%s) or session_id (%s)' % (session_key, session_id)
+ # you only need session_id to terminate
+ if session_key and not session_id:
+ msg = 'Invalid session_key (%s)' % (session_key)
logger.warn("Tautulli Pmsconnect :: Failed to terminate session: %s." % msg)
return msg
if session_id:
logger.info("Tautulli Pmsconnect :: Terminating session %s (session_id %s)." % (session_key, session_id))
- response = self.get_sessions_terminate(session_id=session_id, reason=message)
+ response = self.get_sessions_terminate(session_id=session_id, reason=message, server_id=server_id)
return response.ok
else:
msg = 'Missing session_id'
diff --git a/plexpy/server_manager.py b/plexpy/server_manager.py
index d1de054e..d39448d7 100644
--- a/plexpy/server_manager.py
+++ b/plexpy/server_manager.py
@@ -32,7 +32,7 @@ class ServerManger(object):
pmsServers = []
for server in pmsconnect.PmsConnect().get_servers_info():
url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"])
- pmsServers.append(pmsconnect.PmsConnect(url=url))
+ pmsServers.append(pmsconnect.PmsConnect(server['machine_identifier'], url=url))
return pmsServers
def get_server(self, server_id):
@@ -40,5 +40,5 @@ class ServerManger(object):
for server in pmsconnect.PmsConnect().get_servers_info():
if server['machine_identifier'] == server_id:
url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"])
- return pmsconnect.PmsConnect(url=url)
+ return pmsconnect.PmsConnect(server_id, url=url)
return None
\ No newline at end of file
diff --git a/plexpy/web_socket.py b/plexpy/web_socket.py
index 830ab590..0a609e98 100644
--- a/plexpy/web_socket.py
+++ b/plexpy/web_socket.py
@@ -27,6 +27,7 @@ import time
import certifi
import websocket
+from websocket import create_connection
import plexpy
if plexpy.PYTHON2:
@@ -35,6 +36,7 @@ if plexpy.PYTHON2:
import activity_processor
import database
import logger
+ import plextv
import server_manager
else:
from plexpy import activity_handler
@@ -42,6 +44,7 @@ else:
from plexpy import activity_processor
from plexpy import database
from plexpy import logger
+ from plexpy import plextv
from plexpy import server_manager
@@ -52,7 +55,10 @@ pong_timer = None
pong_count = 0
-def start_thread():
+def isServerUp():
+ return True
+
+def start_threads():
try:
# Check for any existing sessions on start up
activity_pinger.check_active_sessions(ws_request=True)
@@ -61,270 +67,302 @@ def start_thread():
logger.warn("Tautulli WebSocket :: Attempt to fix by flushing temporary sessions...")
database.delete_sessions()
- # Start the websocket listener on it's own thread
- thread = threading.Thread(target=run)
- thread.daemon = True
- thread.start()
+ plex_servers = plextv.get_server_resources(return_servers=True)
+
+ owned_servers = server_manager.ServerManger().get_server_list()
-def on_connect():
- if plexpy.PLEX_SERVER_UP is None:
- plexpy.PLEX_SERVER_UP = True
-
- if not plexpy.PLEX_SERVER_UP:
- logger.info("Tautulli WebSocket :: The Plex Media Server is back up.")
- plexpy.PLEX_SERVER_UP = True
-
- if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'):
- logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.")
- activity_handler.schedule_callback('on_intdown', remove_job=True)
- else:
- on_intup()
-
- plexpy.initialize_scheduler()
- if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG:
- send_ping()
-
-
-def on_disconnect():
- if plexpy.PLEX_SERVER_UP is None:
- plexpy.PLEX_SERVER_UP = False
-
- if plexpy.PLEX_SERVER_UP:
- logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.")
- plexpy.PLEX_SERVER_UP = False
-
- logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.",
- plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
- activity_handler.schedule_callback('on_intdown', func=on_intdown,
- seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
-
- activity_processor.ActivityProcessor().set_temp_stopped()
- plexpy.initialize_scheduler()
-
-
-def on_intdown():
- plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
-
-
-def on_intup():
- plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
-
-
-def reconnect():
- close()
- logger.info("Tautulli WebSocket :: Reconnecting websocket...")
- start_thread()
-
-
-def shutdown():
- global ws_shutdown
- ws_shutdown = True
- close()
-
-
-def close():
- logger.info("Tautulli WebSocket :: Disconnecting websocket...")
- plexpy.WEBSOCKET.close()
- plexpy.WS_CONNECTED = False
-
-
-def send_ping():
- if plexpy.WS_CONNECTED:
- # logger.debug("Tautulli WebSocket :: Sending ping.")
- plexpy.WEBSOCKET.ping("Hi?")
-
- global pong_timer
- pong_timer = threading.Timer(5.0, wait_pong)
- pong_timer.daemon = True
- pong_timer.start()
-
-
-def wait_pong():
- global pong_count
- pong_count += 1
-
- logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count))
-
- if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
- pong_count = 0
- close()
-
-
-def receive_pong():
- # logger.debug("Tautulli WebSocket :: Received pong.")
- global pong_timer
- global pong_count
- if pong_timer:
- pong_timer = pong_timer.cancel()
- pong_count = 0
-
-
-def run():
- from websocket import create_connection
-
- if plexpy.CONFIG.PMS_SSL and plexpy.CONFIG.PMS_URL[:5] == 'https':
- uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications'
- secure = 'secure '
- if plexpy.CONFIG.VERIFY_SSL_CERT:
- sslopt = {'ca_certs': certifi.where()}
- else:
- sslopt = {'cert_reqs': ssl.CERT_NONE}
- else:
- uri = 'ws://%s:%s/:/websockets/notifications' % (
- plexpy.CONFIG.PMS_IP,
- plexpy.CONFIG.PMS_PORT
- )
- secure = ''
- sslopt = None
-
- # Set authentication token (if one is available)
- if plexpy.CONFIG.PMS_TOKEN:
- header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN}
- else:
- header = None
-
- timeout = plexpy.CONFIG.PMS_TIMEOUT
-
- global ws_shutdown
- ws_shutdown = False
- reconnects = 0
-
- # Try an open the websocket connection
- logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure)
- try:
- plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
- logger.info("Tautulli WebSocket :: Ready")
- plexpy.WS_CONNECTED = True
- except (websocket.WebSocketException, IOError, Exception) as e:
- logger.error("Tautulli WebSocket :: %s.", e)
-
- if plexpy.WS_CONNECTED:
- on_connect()
-
- while plexpy.WS_CONNECTED:
- try:
- process(*receive(plexpy.WEBSOCKET))
-
- # successfully received data, reset reconnects counter
- reconnects = 0
-
- except websocket.WebSocketConnectionClosedException:
- if ws_shutdown:
+ # Start each websocket listener on it's own thread per server
+ for owned_server in owned_servers:
+ for server in plex_servers:
+ if owned_server.server_id == server['pms_identifier']:
+ for connection in server['connections']:
+ if connection['local']:
+ wss=WebSocketServer(connection, owned_server.server_id)
+ thread = threading.Thread(target=wss.run)
+ thread.daemon = True
+ thread.start()
+ break
break
- if reconnects == 0:
- logger.warn("Tautulli WebSocket :: Connection has closed.")
- if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
- reconnects += 1
+class WebSocketServer(object):
+ def __init__(self, server, server_id):
+ self.server=server
+ self.WEBSOCKET = None
+ self.WS_CONNECTED = False
+ self.PLEX_SERVER_UP = None
+ self.PLEX_REMOTE_ACCESS_UP = None
+ self.server_id = server_id
- # Sleep 5 between connection attempts
- if reconnects > 1:
- time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
+ def on_connect(self):
+ if self.PLEX_SERVER_UP is None:
+ self.PLEX_SERVER_UP = True
- logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects))
-
- try:
- plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
- logger.info("Tautulli WebSocket :: Ready")
- plexpy.WS_CONNECTED = True
- except (websocket.WebSocketException, IOError, Exception) as e:
- logger.error("Tautulli WebSocket :: %s.", e)
+ if not self.PLEX_SERVER_UP:
+ logger.info("Tautulli WebSocket :: The Plex Media Server is back up.")
+ self.PLEX_SERVER_UP = True
+ if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'):
+ logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.")
+ activity_handler.schedule_callback('on_intdown', remove_job=True)
else:
- close()
- break
+ self.on_intup()
- except (websocket.WebSocketException, Exception) as e:
- if ws_shutdown:
- break
+ plexpy.initialize_scheduler()
+ if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG:
+ self.send_ping()
+
+ def on_disconnect(self):
+ if self.PLEX_SERVER_UP is None:
+ self.PLEX_SERVER_UP = False
+
+ if self.PLEX_SERVER_UP:
+ logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.")
+ self.PLEX_SERVER_UP = False
+
+ logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.",
+ plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
+ activity_handler.schedule_callback('on_intdown', func=self.on_intdown,
+ seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
+
+ activity_processor.ActivityProcessor().set_temp_stopped()
+ plexpy.initialize_scheduler()
+
+
+ def on_intdown(self):
+ plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
+
+
+ def on_intup(self):
+ plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
+
+
+ def reconnect(self):
+ self.close()
+ logger.info("Tautulli WebSocket :: Reconnecting websocket...")
+ self.run()
+
+
+ def shutdown(self):
+ global ws_shutdown
+ ws_shutdown = True
+ self.close()
+
+
+ def close(self):
+ logger.info("Tautulli WebSocket :: Disconnecting websocket...")
+ self.WEBSOCKET.close()
+ self.WS_CONNECTED = False
+
+
+ def send_ping(self):
+ if self.WS_CONNECTED:
+ # logger.debug("Tautulli WebSocket :: Sending ping.")
+ self.WEBSOCKET.ping("Hi?")
+
+ global pong_timer
+ pong_timer = threading.Timer(5.0, self.wait_pong)
+ pong_timer.daemon = True
+ pong_timer.start()
+
+
+ def wait_pong(self):
+ global pong_count
+ pong_count += 1
+
+ logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count))
+
+ if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
+ pong_count = 0
+ self.close()
+
+
+ def receive_pong(self):
+ # logger.debug("Tautulli WebSocket :: Received pong.")
+ global pong_timer
+ global pong_count
+ if pong_timer:
+ pong_timer = pong_timer.cancel()
+ pong_count = 0
+
+
+ def run(self):
+
+ if plexpy.CONFIG.PMS_SSL:
+ uri = ""
+ if self.server:
+ uri = self.server['uri'].replace('https://', 'wss://') + '/:/websockets/notifications'
+ else:
+ uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications'
+ secure = 'secure '
+ if plexpy.CONFIG.VERIFY_SSL_CERT:
+ sslopt = {'ca_certs': certifi.where()}
+ else:
+ sslopt = {'cert_reqs': ssl.CERT_NONE}
+ else:
+ uri = ""
+ if self.server:
+ uri = 'ws://%s:%s/:/websockets/notifications' % (
+ self.server['address'],
+ self.server['port']
+ )
+ else:
+ uri = 'ws://%s:%s/:/websockets/notifications' % (
+ plexpy.CONFIG.PMS_IP,
+ plexpy.CONFIG.PMS_PORT
+ )
+ secure = ''
+ sslopt = None
+
+ # Set authentication token (if one is available)
+ if plexpy.CONFIG.PMS_TOKEN:
+ header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN}
+ else:
+ header = None
+
+ timeout = plexpy.CONFIG.PMS_TIMEOUT
+
+ global ws_shutdown
+ ws_shutdown = False
+ reconnects = 0
+
+ # Try an open the websocket connection
+ logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure)
+ try:
+ self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
+ logger.info("Tautulli WebSocket :: Ready")
+ self.WS_CONNECTED = True
+ except (websocket.WebSocketException, IOError, Exception) as e:
logger.error("Tautulli WebSocket :: %s.", e)
- close()
- break
- if not plexpy.WS_CONNECTED and not ws_shutdown:
- on_disconnect()
+ if self.WS_CONNECTED:
+ self.on_connect()
- logger.debug("Tautulli WebSocket :: Leaving thread.")
+ while self.WS_CONNECTED:
+ try:
+ self.process(*self.receive(self.WEBSOCKET))
+
+ # successfully received data, reset reconnects counter
+ reconnects = 0
+
+ except websocket.WebSocketConnectionClosedException:
+ if ws_shutdown:
+ break
+
+ if reconnects == 0:
+ logger.warn("Tautulli WebSocket :: Connection has closed.")
+
+ if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
+ reconnects += 1
+
+ # Sleep 5 between connection attempts
+ if reconnects > 1:
+ time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
+
+ logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects))
+
+ try:
+ self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
+ logger.info("Tautulli WebSocket :: Ready")
+ self.WS_CONNECTED = True
+ except (websocket.WebSocketException, IOError, Exception) as e:
+ logger.error("Tautulli WebSocket :: %s.", e)
+
+ else:
+ self.close()
+ break
+
+ except (websocket.WebSocketException, Exception) as e:
+ if ws_shutdown:
+ break
+
+ logger.error("Tautulli WebSocket :: %s.", e)
+ self.close()
+ break
+
+ if not self.WS_CONNECTED and not ws_shutdown:
+ self.on_disconnect()
+
+ logger.debug("Tautulli WebSocket :: Leaving thread.")
-def receive(ws):
- frame = ws.recv_frame()
+ def receive(self, ws):
+ frame = ws.recv_frame()
- if not frame:
- raise websocket.WebSocketException("Not a valid frame %s" % frame)
- elif frame.opcode in opcode_data:
- return frame.opcode, frame.data
- elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
- ws.send_close()
- return frame.opcode, None
- elif frame.opcode == websocket.ABNF.OPCODE_PING:
- # logger.debug("Tautulli WebSocket :: Received ping, sending pong.")
- ws.pong("Hi!")
- elif frame.opcode == websocket.ABNF.OPCODE_PONG:
- receive_pong()
+ if not frame:
+ raise websocket.WebSocketException("Not a valid frame %s" % frame)
+ elif frame.opcode in opcode_data:
+ return frame.opcode, frame.data
+ elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
+ ws.send_close()
+ return frame.opcode, None
+ elif frame.opcode == websocket.ABNF.OPCODE_PING:
+ # logger.debug("Tautulli WebSocket :: Received ping, sending pong.")
+ ws.pong("Hi!")
+ elif frame.opcode == websocket.ABNF.OPCODE_PONG:
+ self.receive_pong()
- return None, None
+ return None, None
-def process(opcode, data):
- if opcode not in opcode_data:
- return False
-
- try:
- data = data.decode('utf-8')
- logger.websocket_debug(data)
- event = json.loads(data)
- except Exception as e:
- logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e)
- logger.websocket_error(data)
- return False
-
- event = event.get('NotificationContainer', event)
- event_type = event.get('type')
-
- if not event_type:
- return False
-
- if event_type == 'playing':
- event_data = event.get('PlaySessionStateNotification', event.get('_children', {}))
-
- if not event_data:
- logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.")
+ def process(self, opcode, data):
+ if opcode not in opcode_data:
return False
try:
- activity = activity_handler.ActivityHandler(timeline=event_data[0])
- activity.process()
+ data = data.decode('utf-8')
+ logger.websocket_debug(data)
+ event = json.loads(data)
except Exception as e:
- logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e)
-
- if event_type == 'timeline':
- event_data = event.get('TimelineEntry', event.get('_children', {}))
-
- if not event_data:
- logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.")
+ logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e)
+ logger.websocket_error(data)
return False
- try:
- activity = activity_handler.TimelineHandler(timeline=event_data[0])
- activity.process()
- except Exception as e:
- logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e)
+ event = event.get('NotificationContainer', event)
+ event_type = event.get('type')
- if event_type == 'reachability':
- event_data = event.get('ReachabilityNotification', event.get('_children', {}))
-
- if not event_data:
- logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.")
+ if not event_type:
return False
- try:
- activity = activity_handler.ReachabilityHandler(data=event_data[0])
- activity.process()
- except Exception as e:
- logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e)
+ if event_type == 'playing':
+ event_data = event.get('PlaySessionStateNotification', event.get('_children', {}))
- return True
+ if not event_data:
+ logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.")
+ return False
+
+ try:
+ activity = activity_handler.ActivityHandler(timeline=event_data[0], server_id=self.server_id)
+ activity.process()
+ except Exception as e:
+ logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e)
+
+ if event_type == 'timeline':
+ event_data = event.get('TimelineEntry', event.get('_children', {}))
+
+ if not event_data:
+ logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.")
+ return False
+
+ try:
+ activity = activity_handler.TimelineHandler(timeline=event_data[0], server_id=self.server_id)
+ activity.process()
+ except Exception as e:
+ logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e)
+
+ if event_type == 'reachability':
+ event_data = event.get('ReachabilityNotification', event.get('_children', {}))
+
+ if not event_data:
+ logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.")
+ return False
+
+ try:
+ activity = activity_handler.ReachabilityHandler(data=event_data[0], server_id=self.server_id)
+ activity.process()
+ except Exception as e:
+ logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e)
+
+ return True
diff --git a/plexpy/webserve.py b/plexpy/webserve.py
index 044ac5a6..b80807ae 100644
--- a/plexpy/webserve.py
+++ b/plexpy/webserve.py
@@ -393,7 +393,7 @@ class WebInterface(object):
```
"""
pms_connect = server_manager.ServerManger().get_server(server_id=server_id)
- result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message)
+ result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message, server_id=server_id)
if isinstance(result, str):
return {'result': 'error', 'message': 'Failed to terminate session: {}.'.format(result)}
@@ -6962,7 +6962,7 @@ class WebInterface(object):
```
"""
cherrypy.response.headers['Cache-Control'] = "max-age=0,no-cache,no-store"
- status = {'result': 'success', 'connected': plexpy.PLEX_SERVER_UP}
+ status = {'result': 'success', 'connected': web_socket.isServerUp()}
return status