mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-08-22 06:13:25 -07:00
multiple websocket connections
This commit is contained in:
parent
3c728c6372
commit
9cd3758d15
10 changed files with 341 additions and 299 deletions
|
@ -5,7 +5,7 @@
|
|||
</%def>
|
||||
|
||||
<%def name="body()">
|
||||
<% from plexpy import PLEX_SERVER_UP %>
|
||||
<% from plexpy import web_socket %>
|
||||
<div class="container-fluid">
|
||||
% for section in config['home_sections']:
|
||||
% if section == 'current_activity':
|
||||
|
@ -23,7 +23,7 @@
|
|||
</h3>
|
||||
</div>
|
||||
<div id="currentActivity">
|
||||
% if PLEX_SERVER_UP:
|
||||
% if web_socket.isServerUp():
|
||||
<div class="text-muted" id="dashboard-checking-activity"><i class="fa fa-refresh fa-spin"></i> Checking for activity...</div>
|
||||
% elif config['pms_is_cloud']:
|
||||
<div id="dashboard-no-activity" class="text-muted">Plex Cloud server is sleeping.</div>
|
||||
|
@ -124,7 +124,7 @@
|
|||
<div class="row">
|
||||
<div class="col-md-12">
|
||||
<div id="recentlyAdded" style="margin-right: -15px;">
|
||||
% if PLEX_SERVER_UP:
|
||||
% if web_socket.isServerUp():
|
||||
<div id="dashboard-checking-recently-added" class="text-muted"><i class="fa fa-refresh fa-spin"></i> Looking for new items...</div>
|
||||
% elif config['pms_is_cloud']:
|
||||
<div class="text-muted">Plex Cloud server is sleeping.</div>
|
||||
|
|
|
@ -142,11 +142,6 @@ AUTH_ENABLED = None
|
|||
|
||||
DEV = False
|
||||
|
||||
WEBSOCKET = None
|
||||
WS_CONNECTED = False
|
||||
PLEX_SERVER_UP = None
|
||||
PLEX_REMOTE_ACCESS_UP = None
|
||||
|
||||
TRACKER = None
|
||||
|
||||
WIN_SYS_TRAY_ICON = None
|
||||
|
@ -472,7 +467,7 @@ def initialize_scheduler():
|
|||
schedule_job(config.make_backup, 'Backup Tautulli config',
|
||||
hours=backup_hours, minutes=0, seconds=0, args=(True, True))
|
||||
|
||||
if WS_CONNECTED and CONFIG.PMS_IP and CONFIG.PMS_TOKEN:
|
||||
if web_socket.isServerUp() and CONFIG.PMS_IP and CONFIG.PMS_TOKEN:
|
||||
schedule_job(plextv.get_server_resources, 'Refresh Plex server URLs',
|
||||
hours=12 * (not bool(CONFIG.PMS_URL_MANUAL)), minutes=0, seconds=0)
|
||||
|
||||
|
@ -490,8 +485,8 @@ def initialize_scheduler():
|
|||
|
||||
schedule_job(activity_pinger.connect_server, 'Check for server response',
|
||||
hours=0, minutes=0, seconds=0)
|
||||
schedule_job(web_socket.send_ping, 'Websocket ping',
|
||||
hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG))
|
||||
# schedule_job(web_socket.send_ping, 'Websocket ping',
|
||||
# hours=0, minutes=0, seconds=10 * bool(CONFIG.WEBSOCKET_MONITOR_PING_PONG))
|
||||
|
||||
else:
|
||||
# Cancel all jobs
|
||||
|
|
|
@ -50,7 +50,7 @@ RECENTLY_ADDED_QUEUE = {}
|
|||
|
||||
class ActivityHandler(object):
|
||||
|
||||
def __init__(self, timeline):
|
||||
def __init__(self, timeline, server_id):
|
||||
self.ap = activity_processor.ActivityProcessor()
|
||||
self.timeline = timeline
|
||||
|
||||
|
@ -70,19 +70,20 @@ class ActivityHandler(object):
|
|||
self.db_session = None
|
||||
self.session = None
|
||||
self.metadata = None
|
||||
self.server_id = server_id
|
||||
|
||||
def get_db_session(self):
|
||||
# Retrieve the session data from our temp table
|
||||
self.db_session = self.ap.get_session_by_key(session_key=self.session_key)
|
||||
self.db_session = self.ap.get_session_by_key(session_key=self.session_key, server_id=self.server_id)
|
||||
|
||||
def get_metadata(self, skip_cache=False):
|
||||
if self.metadata is None:
|
||||
cache_key = None if skip_cache else self.session_key
|
||||
for pms_connect in server_manager.ServerManger().get_server_list():
|
||||
metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key)
|
||||
pms_connect = server_manager.ServerManger().get_server(server_id=self.server_id)
|
||||
metadata = pms_connect.get_metadata_details(rating_key=self.rating_key, cache_key=cache_key)
|
||||
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
if metadata:
|
||||
self.metadata = metadata
|
||||
|
||||
def get_live_session(self, skip_cache=False):
|
||||
for pms_connect in server_manager.ServerManger().get_server_list():
|
||||
|
@ -439,7 +440,7 @@ class ActivityHandler(object):
|
|||
|
||||
class TimelineHandler(object):
|
||||
|
||||
def __init__(self, timeline):
|
||||
def __init__(self, timeline, server_id=None):
|
||||
self.timeline = timeline
|
||||
|
||||
self.rating_key = None
|
||||
|
@ -458,6 +459,7 @@ class TimelineHandler(object):
|
|||
self.metadata_state = self.timeline.get('metadataState')
|
||||
self.media_state = self.timeline.get('mediaState')
|
||||
self.queue_size = self.timeline.get('queueSize')
|
||||
self.server_id = server_id
|
||||
|
||||
# This function receives events from our websocket connection
|
||||
def process(self):
|
||||
|
@ -546,10 +548,11 @@ class TimelineHandler(object):
|
|||
|
||||
class ReachabilityHandler(object):
|
||||
|
||||
def __init__(self, data):
|
||||
def __init__(self, data, server_id=None):
|
||||
self.data = data
|
||||
|
||||
self.is_reachable = self.data.get('reachable', False)
|
||||
self.server_id = server_id
|
||||
|
||||
def remote_access_enabled(self):
|
||||
for pms_connect in server_manager.ServerManger().get_server_list():
|
||||
|
|
|
@ -252,8 +252,8 @@ def connect_server(log=True, startup=False):
|
|||
logger.info("Tautulli Monitor :: Attempting to reconnect Plex server...")
|
||||
|
||||
try:
|
||||
web_socket.start_thread()
|
||||
except Exception as e:
|
||||
web_socket.start_threads()
|
||||
except Exception as e:
|
||||
logger.error("Websocket :: Unable to open connection: %s." % e)
|
||||
|
||||
|
||||
|
|
|
@ -193,7 +193,7 @@ class ActivityProcessor(object):
|
|||
user_details = user_data.get_details(user_id=session['user_id'])
|
||||
|
||||
library_data = libraries.Libraries()
|
||||
library_details = library_data.get_details(section_id=section_id)
|
||||
library_details = library_data.get_details(section_id=section_id, server_id=session['server_id'])
|
||||
|
||||
# Return false if failed to retrieve user or library details
|
||||
if not user_details or not library_details:
|
||||
|
@ -322,7 +322,8 @@ class ActivityProcessor(object):
|
|||
'view_offset': session['view_offset'],
|
||||
'section_id': metadata['section_id'],
|
||||
'secure': session['secure'],
|
||||
'relayed': session['relayed']
|
||||
'relayed': session['relayed'],
|
||||
'server_id': session['server_id']
|
||||
}
|
||||
|
||||
# logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history transaction..."
|
||||
|
@ -575,21 +576,21 @@ class ActivityProcessor(object):
|
|||
sessions = self.db.select(query, args)
|
||||
return sessions
|
||||
|
||||
def get_session_by_key(self, session_key=None):
|
||||
def get_session_by_key(self, session_key=None, server_id=None):
|
||||
if str(session_key).isdigit():
|
||||
session = self.db.select_single("SELECT * FROM sessions "
|
||||
"WHERE session_key = ? ",
|
||||
args=[session_key])
|
||||
"WHERE session_key = ? AND server_id = ? ",
|
||||
args=[session_key, server_id])
|
||||
if session:
|
||||
return session
|
||||
|
||||
return None
|
||||
|
||||
def get_session_by_id(self, session_id=None):
|
||||
if session_id:
|
||||
def get_session_by_id(self, session_id=None, server_id=None):
|
||||
if session_id and server_id:
|
||||
session = self.db.select_single("SELECT * FROM sessions "
|
||||
"WHERE session_id = ? ",
|
||||
args=[session_id])
|
||||
"WHERE session_id = ? AND server_id = ? ",
|
||||
args=[session_id, server_id])
|
||||
if session:
|
||||
return session
|
||||
|
||||
|
|
|
@ -44,7 +44,7 @@ else:
|
|||
from plexpy.plex import Plex
|
||||
|
||||
|
||||
def get_server_resources(return_presence=False, return_server=False, return_info=False, **kwargs):
|
||||
def get_server_resources(return_presence=False, return_server=False, return_info=False, return_servers=False, **kwargs):
|
||||
if not return_presence and not return_info:
|
||||
logger.info("Tautulli PlexTV :: Requesting resources for server...")
|
||||
|
||||
|
@ -75,11 +75,18 @@ def get_server_resources(return_presence=False, return_server=False, return_info
|
|||
else:
|
||||
scheme = 'http'
|
||||
|
||||
plex_tv = PlexTV()
|
||||
|
||||
if return_servers:
|
||||
return plex_tv.get_servers_connections(pms_identifier=server['pms_identifier'],
|
||||
pms_ip=server['pms_ip'],
|
||||
pms_port=server['pms_port'],
|
||||
include_https=server['pms_ssl'])
|
||||
|
||||
fallback_url = '{scheme}://{hostname}:{port}'.format(scheme=scheme,
|
||||
hostname=server['pms_ip'],
|
||||
port=server['pms_port'])
|
||||
|
||||
plex_tv = PlexTV()
|
||||
result = plex_tv.get_server_connections(pms_identifier=server['pms_identifier'],
|
||||
pms_ip=server['pms_ip'],
|
||||
pms_port=server['pms_port'],
|
||||
|
@ -591,6 +598,14 @@ class PlexTV(object):
|
|||
return response.ok
|
||||
|
||||
def get_server_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True):
|
||||
servers = self.get_servers_connections(pms_identifier, pms_ip, pms_port, include_https)
|
||||
for server in servers:
|
||||
if server['pms_identifier'] == pms_identifier:
|
||||
return server
|
||||
return None
|
||||
|
||||
|
||||
def get_servers_connections(self, pms_identifier='', pms_ip='', pms_port=32400, include_https=True):
|
||||
|
||||
if not pms_identifier:
|
||||
logger.error("Tautulli PlexTV :: Unable to retrieve server connections: no pms_identifier provided.")
|
||||
|
@ -629,16 +644,14 @@ class PlexTV(object):
|
|||
server['connections'] = conn
|
||||
return server
|
||||
|
||||
server = {}
|
||||
servers = []
|
||||
|
||||
# Try to match the device
|
||||
for a in xml_head:
|
||||
if helpers.get_xml_attr(a, 'clientIdentifier') == pms_identifier:
|
||||
server = get_connections(a)
|
||||
break
|
||||
servers.append(get_connections(a))
|
||||
|
||||
# Else no device match found
|
||||
if not server:
|
||||
if not len(servers):
|
||||
# Try to match the PMS_IP and PMS_PORT
|
||||
for a in xml_head:
|
||||
if helpers.get_xml_attr(a, 'provides') == 'server':
|
||||
|
@ -647,13 +660,10 @@ class PlexTV(object):
|
|||
for connection in connections:
|
||||
if helpers.get_xml_attr(connection, 'address') == pms_ip and \
|
||||
helpers.get_xml_attr(connection, 'port') == str(pms_port):
|
||||
server = get_connections(a)
|
||||
servers.append(get_connections(a))
|
||||
break
|
||||
|
||||
if server.get('connections'):
|
||||
break
|
||||
|
||||
return server
|
||||
return servers
|
||||
|
||||
def get_server_times(self):
|
||||
servers = self.get_plextv_server_list(output_format='xml')
|
||||
|
|
|
@ -74,7 +74,7 @@ class PmsConnect(object):
|
|||
Retrieve data from Plex Server
|
||||
"""
|
||||
|
||||
def __init__(self, url=None, token=None):
|
||||
def __init__(self, server_id=None, url=None, token=None):
|
||||
self.url = url
|
||||
self.token = token
|
||||
|
||||
|
@ -85,6 +85,8 @@ class PmsConnect(object):
|
|||
port=plexpy.CONFIG.PMS_PORT)
|
||||
self.timeout = plexpy.CONFIG.PMS_TIMEOUT
|
||||
|
||||
self.server_id = server_id
|
||||
|
||||
if not self.token:
|
||||
# Check if we should use the admin token, or the guest server token
|
||||
if session.get_session_user_id():
|
||||
|
@ -116,7 +118,7 @@ class PmsConnect(object):
|
|||
|
||||
return request
|
||||
|
||||
def get_sessions_terminate(self, session_id='', reason=''):
|
||||
def get_sessions_terminate(self, session_id='', reason='', server_id=None):
|
||||
"""
|
||||
Return current sessions.
|
||||
|
||||
|
@ -2330,7 +2332,7 @@ class PmsConnect(object):
|
|||
|
||||
return session_output
|
||||
|
||||
def terminate_session(self, session_key='', session_id='', message=''):
|
||||
def terminate_session(self, session_key='', session_id='', message='', server_id=None):
|
||||
"""
|
||||
Terminates a streaming session.
|
||||
"""
|
||||
|
@ -2345,26 +2347,19 @@ class PmsConnect(object):
|
|||
ap = activity_processor.ActivityProcessor()
|
||||
|
||||
if session_key:
|
||||
session = ap.get_session_by_key(session_key=session_key)
|
||||
session = ap.get_session_by_key(session_key=session_key, server_id=server_id)
|
||||
if session and not session_id:
|
||||
session_id = session['session_id']
|
||||
|
||||
elif session_id:
|
||||
session = ap.get_session_by_id(session_id=session_id)
|
||||
if session and not session_key:
|
||||
session_key = session['session_key']
|
||||
|
||||
else:
|
||||
session = session_key = session_id = None
|
||||
|
||||
if not session:
|
||||
msg = 'Invalid session_key (%s) or session_id (%s)' % (session_key, session_id)
|
||||
# you only need session_id to terminate
|
||||
if session_key and not session_id:
|
||||
msg = 'Invalid session_key (%s)' % (session_key)
|
||||
logger.warn("Tautulli Pmsconnect :: Failed to terminate session: %s." % msg)
|
||||
return msg
|
||||
|
||||
if session_id:
|
||||
logger.info("Tautulli Pmsconnect :: Terminating session %s (session_id %s)." % (session_key, session_id))
|
||||
response = self.get_sessions_terminate(session_id=session_id, reason=message)
|
||||
response = self.get_sessions_terminate(session_id=session_id, reason=message, server_id=server_id)
|
||||
return response.ok
|
||||
else:
|
||||
msg = 'Missing session_id'
|
||||
|
|
|
@ -32,7 +32,7 @@ class ServerManger(object):
|
|||
pmsServers = []
|
||||
for server in pmsconnect.PmsConnect().get_servers_info():
|
||||
url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"])
|
||||
pmsServers.append(pmsconnect.PmsConnect(url=url))
|
||||
pmsServers.append(pmsconnect.PmsConnect(server['machine_identifier'], url=url))
|
||||
return pmsServers
|
||||
|
||||
def get_server(self, server_id):
|
||||
|
@ -40,5 +40,5 @@ class ServerManger(object):
|
|||
for server in pmsconnect.PmsConnect().get_servers_info():
|
||||
if server['machine_identifier'] == server_id:
|
||||
url = 'http://{hostname}:{port}'.format(hostname=server["host"], port=server["port"])
|
||||
return pmsconnect.PmsConnect(url=url)
|
||||
return pmsconnect.PmsConnect(server_id, url=url)
|
||||
return None
|
|
@ -27,6 +27,7 @@ import time
|
|||
|
||||
import certifi
|
||||
import websocket
|
||||
from websocket import create_connection
|
||||
|
||||
import plexpy
|
||||
if plexpy.PYTHON2:
|
||||
|
@ -35,6 +36,7 @@ if plexpy.PYTHON2:
|
|||
import activity_processor
|
||||
import database
|
||||
import logger
|
||||
import plextv
|
||||
import server_manager
|
||||
else:
|
||||
from plexpy import activity_handler
|
||||
|
@ -42,6 +44,7 @@ else:
|
|||
from plexpy import activity_processor
|
||||
from plexpy import database
|
||||
from plexpy import logger
|
||||
from plexpy import plextv
|
||||
from plexpy import server_manager
|
||||
|
||||
|
||||
|
@ -52,7 +55,10 @@ pong_timer = None
|
|||
pong_count = 0
|
||||
|
||||
|
||||
def start_thread():
|
||||
def isServerUp():
|
||||
return True
|
||||
|
||||
def start_threads():
|
||||
try:
|
||||
# Check for any existing sessions on start up
|
||||
activity_pinger.check_active_sessions(ws_request=True)
|
||||
|
@ -61,270 +67,302 @@ def start_thread():
|
|||
logger.warn("Tautulli WebSocket :: Attempt to fix by flushing temporary sessions...")
|
||||
database.delete_sessions()
|
||||
|
||||
# Start the websocket listener on it's own thread
|
||||
thread = threading.Thread(target=run)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
plex_servers = plextv.get_server_resources(return_servers=True)
|
||||
|
||||
owned_servers = server_manager.ServerManger().get_server_list()
|
||||
|
||||
|
||||
def on_connect():
|
||||
if plexpy.PLEX_SERVER_UP is None:
|
||||
plexpy.PLEX_SERVER_UP = True
|
||||
|
||||
if not plexpy.PLEX_SERVER_UP:
|
||||
logger.info("Tautulli WebSocket :: The Plex Media Server is back up.")
|
||||
plexpy.PLEX_SERVER_UP = True
|
||||
|
||||
if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'):
|
||||
logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.")
|
||||
activity_handler.schedule_callback('on_intdown', remove_job=True)
|
||||
else:
|
||||
on_intup()
|
||||
|
||||
plexpy.initialize_scheduler()
|
||||
if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG:
|
||||
send_ping()
|
||||
|
||||
|
||||
def on_disconnect():
|
||||
if plexpy.PLEX_SERVER_UP is None:
|
||||
plexpy.PLEX_SERVER_UP = False
|
||||
|
||||
if plexpy.PLEX_SERVER_UP:
|
||||
logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.")
|
||||
plexpy.PLEX_SERVER_UP = False
|
||||
|
||||
logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.",
|
||||
plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
|
||||
activity_handler.schedule_callback('on_intdown', func=on_intdown,
|
||||
seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
|
||||
|
||||
activity_processor.ActivityProcessor().set_temp_stopped()
|
||||
plexpy.initialize_scheduler()
|
||||
|
||||
|
||||
def on_intdown():
|
||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
||||
|
||||
|
||||
def on_intup():
|
||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
|
||||
|
||||
|
||||
def reconnect():
|
||||
close()
|
||||
logger.info("Tautulli WebSocket :: Reconnecting websocket...")
|
||||
start_thread()
|
||||
|
||||
|
||||
def shutdown():
|
||||
global ws_shutdown
|
||||
ws_shutdown = True
|
||||
close()
|
||||
|
||||
|
||||
def close():
|
||||
logger.info("Tautulli WebSocket :: Disconnecting websocket...")
|
||||
plexpy.WEBSOCKET.close()
|
||||
plexpy.WS_CONNECTED = False
|
||||
|
||||
|
||||
def send_ping():
|
||||
if plexpy.WS_CONNECTED:
|
||||
# logger.debug("Tautulli WebSocket :: Sending ping.")
|
||||
plexpy.WEBSOCKET.ping("Hi?")
|
||||
|
||||
global pong_timer
|
||||
pong_timer = threading.Timer(5.0, wait_pong)
|
||||
pong_timer.daemon = True
|
||||
pong_timer.start()
|
||||
|
||||
|
||||
def wait_pong():
|
||||
global pong_count
|
||||
pong_count += 1
|
||||
|
||||
logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count))
|
||||
|
||||
if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||
pong_count = 0
|
||||
close()
|
||||
|
||||
|
||||
def receive_pong():
|
||||
# logger.debug("Tautulli WebSocket :: Received pong.")
|
||||
global pong_timer
|
||||
global pong_count
|
||||
if pong_timer:
|
||||
pong_timer = pong_timer.cancel()
|
||||
pong_count = 0
|
||||
|
||||
|
||||
def run():
|
||||
from websocket import create_connection
|
||||
|
||||
if plexpy.CONFIG.PMS_SSL and plexpy.CONFIG.PMS_URL[:5] == 'https':
|
||||
uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications'
|
||||
secure = 'secure '
|
||||
if plexpy.CONFIG.VERIFY_SSL_CERT:
|
||||
sslopt = {'ca_certs': certifi.where()}
|
||||
else:
|
||||
sslopt = {'cert_reqs': ssl.CERT_NONE}
|
||||
else:
|
||||
uri = 'ws://%s:%s/:/websockets/notifications' % (
|
||||
plexpy.CONFIG.PMS_IP,
|
||||
plexpy.CONFIG.PMS_PORT
|
||||
)
|
||||
secure = ''
|
||||
sslopt = None
|
||||
|
||||
# Set authentication token (if one is available)
|
||||
if plexpy.CONFIG.PMS_TOKEN:
|
||||
header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN}
|
||||
else:
|
||||
header = None
|
||||
|
||||
timeout = plexpy.CONFIG.PMS_TIMEOUT
|
||||
|
||||
global ws_shutdown
|
||||
ws_shutdown = False
|
||||
reconnects = 0
|
||||
|
||||
# Try an open the websocket connection
|
||||
logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure)
|
||||
try:
|
||||
plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
|
||||
logger.info("Tautulli WebSocket :: Ready")
|
||||
plexpy.WS_CONNECTED = True
|
||||
except (websocket.WebSocketException, IOError, Exception) as e:
|
||||
logger.error("Tautulli WebSocket :: %s.", e)
|
||||
|
||||
if plexpy.WS_CONNECTED:
|
||||
on_connect()
|
||||
|
||||
while plexpy.WS_CONNECTED:
|
||||
try:
|
||||
process(*receive(plexpy.WEBSOCKET))
|
||||
|
||||
# successfully received data, reset reconnects counter
|
||||
reconnects = 0
|
||||
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
if ws_shutdown:
|
||||
# Start each websocket listener on it's own thread per server
|
||||
for owned_server in owned_servers:
|
||||
for server in plex_servers:
|
||||
if owned_server.server_id == server['pms_identifier']:
|
||||
for connection in server['connections']:
|
||||
if connection['local']:
|
||||
wss=WebSocketServer(connection, owned_server.server_id)
|
||||
thread = threading.Thread(target=wss.run)
|
||||
thread.daemon = True
|
||||
thread.start()
|
||||
break
|
||||
break
|
||||
|
||||
if reconnects == 0:
|
||||
logger.warn("Tautulli WebSocket :: Connection has closed.")
|
||||
|
||||
if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||
reconnects += 1
|
||||
class WebSocketServer(object):
|
||||
def __init__(self, server, server_id):
|
||||
self.server=server
|
||||
self.WEBSOCKET = None
|
||||
self.WS_CONNECTED = False
|
||||
self.PLEX_SERVER_UP = None
|
||||
self.PLEX_REMOTE_ACCESS_UP = None
|
||||
self.server_id = server_id
|
||||
|
||||
# Sleep 5 between connection attempts
|
||||
if reconnects > 1:
|
||||
time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
|
||||
def on_connect(self):
|
||||
if self.PLEX_SERVER_UP is None:
|
||||
self.PLEX_SERVER_UP = True
|
||||
|
||||
logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects))
|
||||
|
||||
try:
|
||||
plexpy.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
|
||||
logger.info("Tautulli WebSocket :: Ready")
|
||||
plexpy.WS_CONNECTED = True
|
||||
except (websocket.WebSocketException, IOError, Exception) as e:
|
||||
logger.error("Tautulli WebSocket :: %s.", e)
|
||||
if not self.PLEX_SERVER_UP:
|
||||
logger.info("Tautulli WebSocket :: The Plex Media Server is back up.")
|
||||
self.PLEX_SERVER_UP = True
|
||||
|
||||
if activity_handler.ACTIVITY_SCHED.get_job('on_intdown'):
|
||||
logger.debug("Tautulli WebSocket :: Cancelling scheduled Plex server down callback.")
|
||||
activity_handler.schedule_callback('on_intdown', remove_job=True)
|
||||
else:
|
||||
close()
|
||||
break
|
||||
self.on_intup()
|
||||
|
||||
except (websocket.WebSocketException, Exception) as e:
|
||||
if ws_shutdown:
|
||||
break
|
||||
plexpy.initialize_scheduler()
|
||||
if plexpy.CONFIG.WEBSOCKET_MONITOR_PING_PONG:
|
||||
self.send_ping()
|
||||
|
||||
|
||||
def on_disconnect(self):
|
||||
if self.PLEX_SERVER_UP is None:
|
||||
self.PLEX_SERVER_UP = False
|
||||
|
||||
if self.PLEX_SERVER_UP:
|
||||
logger.info("Tautulli WebSocket :: Unable to get a response from the server, Plex server is down.")
|
||||
self.PLEX_SERVER_UP = False
|
||||
|
||||
logger.debug("Tautulli WebSocket :: Scheduling Plex server down callback in %d seconds.",
|
||||
plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
|
||||
activity_handler.schedule_callback('on_intdown', func=self.on_intdown,
|
||||
seconds=plexpy.CONFIG.NOTIFY_SERVER_CONNECTION_THRESHOLD)
|
||||
|
||||
activity_processor.ActivityProcessor().set_temp_stopped()
|
||||
plexpy.initialize_scheduler()
|
||||
|
||||
|
||||
def on_intdown(self):
|
||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intdown'})
|
||||
|
||||
|
||||
def on_intup(self):
|
||||
plexpy.NOTIFY_QUEUE.put({'notify_action': 'on_intup'})
|
||||
|
||||
|
||||
def reconnect(self):
|
||||
self.close()
|
||||
logger.info("Tautulli WebSocket :: Reconnecting websocket...")
|
||||
self.run()
|
||||
|
||||
|
||||
def shutdown(self):
|
||||
global ws_shutdown
|
||||
ws_shutdown = True
|
||||
self.close()
|
||||
|
||||
|
||||
def close(self):
|
||||
logger.info("Tautulli WebSocket :: Disconnecting websocket...")
|
||||
self.WEBSOCKET.close()
|
||||
self.WS_CONNECTED = False
|
||||
|
||||
|
||||
def send_ping(self):
|
||||
if self.WS_CONNECTED:
|
||||
# logger.debug("Tautulli WebSocket :: Sending ping.")
|
||||
self.WEBSOCKET.ping("Hi?")
|
||||
|
||||
global pong_timer
|
||||
pong_timer = threading.Timer(5.0, self.wait_pong)
|
||||
pong_timer.daemon = True
|
||||
pong_timer.start()
|
||||
|
||||
|
||||
def wait_pong(self):
|
||||
global pong_count
|
||||
pong_count += 1
|
||||
|
||||
logger.warn("Tautulli WebSocket :: Failed to receive pong from websocket, ping attempt %s." % str(pong_count))
|
||||
|
||||
if pong_count >= plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||
pong_count = 0
|
||||
self.close()
|
||||
|
||||
|
||||
def receive_pong(self):
|
||||
# logger.debug("Tautulli WebSocket :: Received pong.")
|
||||
global pong_timer
|
||||
global pong_count
|
||||
if pong_timer:
|
||||
pong_timer = pong_timer.cancel()
|
||||
pong_count = 0
|
||||
|
||||
|
||||
def run(self):
|
||||
|
||||
if plexpy.CONFIG.PMS_SSL:
|
||||
uri = ""
|
||||
if self.server:
|
||||
uri = self.server['uri'].replace('https://', 'wss://') + '/:/websockets/notifications'
|
||||
else:
|
||||
uri = plexpy.CONFIG.PMS_URL.replace('https://', 'wss://') + '/:/websockets/notifications'
|
||||
secure = 'secure '
|
||||
if plexpy.CONFIG.VERIFY_SSL_CERT:
|
||||
sslopt = {'ca_certs': certifi.where()}
|
||||
else:
|
||||
sslopt = {'cert_reqs': ssl.CERT_NONE}
|
||||
else:
|
||||
uri = ""
|
||||
if self.server:
|
||||
uri = 'ws://%s:%s/:/websockets/notifications' % (
|
||||
self.server['address'],
|
||||
self.server['port']
|
||||
)
|
||||
else:
|
||||
uri = 'ws://%s:%s/:/websockets/notifications' % (
|
||||
plexpy.CONFIG.PMS_IP,
|
||||
plexpy.CONFIG.PMS_PORT
|
||||
)
|
||||
secure = ''
|
||||
sslopt = None
|
||||
|
||||
# Set authentication token (if one is available)
|
||||
if plexpy.CONFIG.PMS_TOKEN:
|
||||
header = {"X-Plex-Token": plexpy.CONFIG.PMS_TOKEN}
|
||||
else:
|
||||
header = None
|
||||
|
||||
timeout = plexpy.CONFIG.PMS_TIMEOUT
|
||||
|
||||
global ws_shutdown
|
||||
ws_shutdown = False
|
||||
reconnects = 0
|
||||
|
||||
# Try an open the websocket connection
|
||||
logger.info("Tautulli WebSocket :: Opening %swebsocket." % secure)
|
||||
try:
|
||||
self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
|
||||
logger.info("Tautulli WebSocket :: Ready")
|
||||
self.WS_CONNECTED = True
|
||||
except (websocket.WebSocketException, IOError, Exception) as e:
|
||||
logger.error("Tautulli WebSocket :: %s.", e)
|
||||
close()
|
||||
break
|
||||
|
||||
if not plexpy.WS_CONNECTED and not ws_shutdown:
|
||||
on_disconnect()
|
||||
if self.WS_CONNECTED:
|
||||
self.on_connect()
|
||||
|
||||
logger.debug("Tautulli WebSocket :: Leaving thread.")
|
||||
while self.WS_CONNECTED:
|
||||
try:
|
||||
self.process(*self.receive(self.WEBSOCKET))
|
||||
|
||||
# successfully received data, reset reconnects counter
|
||||
reconnects = 0
|
||||
|
||||
except websocket.WebSocketConnectionClosedException:
|
||||
if ws_shutdown:
|
||||
break
|
||||
|
||||
if reconnects == 0:
|
||||
logger.warn("Tautulli WebSocket :: Connection has closed.")
|
||||
|
||||
if not plexpy.CONFIG.PMS_IS_CLOUD and reconnects < plexpy.CONFIG.WEBSOCKET_CONNECTION_ATTEMPTS:
|
||||
reconnects += 1
|
||||
|
||||
# Sleep 5 between connection attempts
|
||||
if reconnects > 1:
|
||||
time.sleep(plexpy.CONFIG.WEBSOCKET_CONNECTION_TIMEOUT)
|
||||
|
||||
logger.warn("Tautulli WebSocket :: Reconnection attempt %s." % str(reconnects))
|
||||
|
||||
try:
|
||||
self.WEBSOCKET = create_connection(uri, timeout=timeout, header=header, sslopt=sslopt)
|
||||
logger.info("Tautulli WebSocket :: Ready")
|
||||
self.WS_CONNECTED = True
|
||||
except (websocket.WebSocketException, IOError, Exception) as e:
|
||||
logger.error("Tautulli WebSocket :: %s.", e)
|
||||
|
||||
else:
|
||||
self.close()
|
||||
break
|
||||
|
||||
except (websocket.WebSocketException, Exception) as e:
|
||||
if ws_shutdown:
|
||||
break
|
||||
|
||||
logger.error("Tautulli WebSocket :: %s.", e)
|
||||
self.close()
|
||||
break
|
||||
|
||||
if not self.WS_CONNECTED and not ws_shutdown:
|
||||
self.on_disconnect()
|
||||
|
||||
logger.debug("Tautulli WebSocket :: Leaving thread.")
|
||||
|
||||
|
||||
def receive(ws):
|
||||
frame = ws.recv_frame()
|
||||
def receive(self, ws):
|
||||
frame = ws.recv_frame()
|
||||
|
||||
if not frame:
|
||||
raise websocket.WebSocketException("Not a valid frame %s" % frame)
|
||||
elif frame.opcode in opcode_data:
|
||||
return frame.opcode, frame.data
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
|
||||
ws.send_close()
|
||||
return frame.opcode, None
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_PING:
|
||||
# logger.debug("Tautulli WebSocket :: Received ping, sending pong.")
|
||||
ws.pong("Hi!")
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_PONG:
|
||||
receive_pong()
|
||||
if not frame:
|
||||
raise websocket.WebSocketException("Not a valid frame %s" % frame)
|
||||
elif frame.opcode in opcode_data:
|
||||
return frame.opcode, frame.data
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
|
||||
ws.send_close()
|
||||
return frame.opcode, None
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_PING:
|
||||
# logger.debug("Tautulli WebSocket :: Received ping, sending pong.")
|
||||
ws.pong("Hi!")
|
||||
elif frame.opcode == websocket.ABNF.OPCODE_PONG:
|
||||
self.receive_pong()
|
||||
|
||||
return None, None
|
||||
return None, None
|
||||
|
||||
|
||||
def process(opcode, data):
|
||||
if opcode not in opcode_data:
|
||||
return False
|
||||
|
||||
try:
|
||||
data = data.decode('utf-8')
|
||||
logger.websocket_debug(data)
|
||||
event = json.loads(data)
|
||||
except Exception as e:
|
||||
logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e)
|
||||
logger.websocket_error(data)
|
||||
return False
|
||||
|
||||
event = event.get('NotificationContainer', event)
|
||||
event_type = event.get('type')
|
||||
|
||||
if not event_type:
|
||||
return False
|
||||
|
||||
if event_type == 'playing':
|
||||
event_data = event.get('PlaySessionStateNotification', event.get('_children', {}))
|
||||
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.")
|
||||
def process(self, opcode, data):
|
||||
if opcode not in opcode_data:
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.ActivityHandler(timeline=event_data[0])
|
||||
activity.process()
|
||||
data = data.decode('utf-8')
|
||||
logger.websocket_debug(data)
|
||||
event = json.loads(data)
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e)
|
||||
|
||||
if event_type == 'timeline':
|
||||
event_data = event.get('TimelineEntry', event.get('_children', {}))
|
||||
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.")
|
||||
logger.warn("Tautulli WebSocket :: Error decoding message from websocket: %s" % e)
|
||||
logger.websocket_error(data)
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.TimelineHandler(timeline=event_data[0])
|
||||
activity.process()
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e)
|
||||
event = event.get('NotificationContainer', event)
|
||||
event_type = event.get('type')
|
||||
|
||||
if event_type == 'reachability':
|
||||
event_data = event.get('ReachabilityNotification', event.get('_children', {}))
|
||||
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.")
|
||||
if not event_type:
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.ReachabilityHandler(data=event_data[0])
|
||||
activity.process()
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e)
|
||||
if event_type == 'playing':
|
||||
event_data = event.get('PlaySessionStateNotification', event.get('_children', {}))
|
||||
|
||||
return True
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Session event found but unable to get websocket data.")
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.ActivityHandler(timeline=event_data[0], server_id=self.server_id)
|
||||
activity.process()
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process session data: %s." % e)
|
||||
|
||||
if event_type == 'timeline':
|
||||
event_data = event.get('TimelineEntry', event.get('_children', {}))
|
||||
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Timeline event found but unable to get websocket data.")
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.TimelineHandler(timeline=event_data[0], server_id=self.server_id)
|
||||
activity.process()
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process timeline data: %s." % e)
|
||||
|
||||
if event_type == 'reachability':
|
||||
event_data = event.get('ReachabilityNotification', event.get('_children', {}))
|
||||
|
||||
if not event_data:
|
||||
logger.debug("Tautulli WebSocket :: Reachability event found but unable to get websocket data.")
|
||||
return False
|
||||
|
||||
try:
|
||||
activity = activity_handler.ReachabilityHandler(data=event_data[0], server_id=self.server_id)
|
||||
activity.process()
|
||||
except Exception as e:
|
||||
logger.exception("Tautulli WebSocket :: Failed to process reachability data: %s." % e)
|
||||
|
||||
return True
|
||||
|
|
|
@ -393,7 +393,7 @@ class WebInterface(object):
|
|||
```
|
||||
"""
|
||||
pms_connect = server_manager.ServerManger().get_server(server_id=server_id)
|
||||
result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message)
|
||||
result = pms_connect.terminate_session(session_key=session_key, session_id=session_id, message=message, server_id=server_id)
|
||||
|
||||
if isinstance(result, str):
|
||||
return {'result': 'error', 'message': 'Failed to terminate session: {}.'.format(result)}
|
||||
|
@ -6962,7 +6962,7 @@ class WebInterface(object):
|
|||
```
|
||||
"""
|
||||
cherrypy.response.headers['Cache-Control'] = "max-age=0,no-cache,no-store"
|
||||
status = {'result': 'success', 'connected': plexpy.PLEX_SERVER_UP}
|
||||
status = {'result': 'success', 'connected': web_socket.isServerUp()}
|
||||
|
||||
return status
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue