Merge pull request #199 from drzoidberg33/websocket-testing

Initial Websocket code
This commit is contained in:
drzoidberg33 2015-09-23 00:44:51 +02:00
commit 302ca85dd3
29 changed files with 8354 additions and 205 deletions

View file

@ -31,7 +31,7 @@ except ImportError:
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.triggers.interval import IntervalTrigger
from plexpy import versioncheck, logger, monitor, plextv
from plexpy import versioncheck, logger, activity_pinger, plextv
import plexpy.config
PROG_DIR = None
@ -71,6 +71,7 @@ COMMITS_BEHIND = None
UMASK = None
POLLING_FAILOVER = False
def initialize(config_file):
with INIT_LOCK:
@ -80,6 +81,7 @@ def initialize(config_file):
global CURRENT_VERSION
global LATEST_VERSION
global UMASK
global POLLING_FAILOVER
CONFIG = plexpy.config.Config(config_file)
@ -279,7 +281,11 @@ def initialize_scheduler():
if CONFIG.PMS_IP and CONFIG.PMS_TOKEN:
schedule_job(plextv.get_real_pms_url, 'Refresh Plex Server URLs', hours=12, minutes=0, seconds=0)
schedule_job(monitor.check_active_sessions, 'Check for active sessions', hours=0, minutes=0, seconds=seconds)
# If we're not using websockets then fall back to polling
if not CONFIG.MONITORING_USE_WEBSOCKET or POLLING_FAILOVER:
schedule_job(activity_pinger.check_active_sessions, 'Check for active sessions',
hours=0, minutes=0, seconds=seconds)
# Refresh the users list
if CONFIG.REFRESH_USERS_INTERVAL:
@ -355,7 +361,7 @@ def dbcheck():
'audio_channels INTEGER, transcode_protocol TEXT, transcode_container TEXT, '
'transcode_video_codec TEXT, transcode_audio_codec TEXT, transcode_audio_channels INTEGER,'
'transcode_width INTEGER, transcode_height INTEGER, buffer_count INTEGER DEFAULT 0, '
'buffer_last_triggered INTEGER)'
'buffer_last_triggered INTEGER, last_paused INTEGER)'
)
# session_history table :: This is a history table which logs essential stream details
@ -603,6 +609,15 @@ def dbcheck():
'ALTER TABLE users ADD COLUMN custom_avatar_url TEXT'
)
# Upgrade sessions table from earlier versions
try:
c_db.execute('SELECT last_paused from sessions')
except sqlite3.OperationalError:
logger.debug(u"Altering database. Updating database table sessions.")
c_db.execute(
'ALTER TABLE sessions ADD COLUMN last_paused INTEGER'
)
# Add "Local" user to database as default unauthenticated user.
result = c_db.execute('SELECT id FROM users WHERE username = "Local"')
if not result.fetchone():
@ -616,10 +631,6 @@ def shutdown(restart=False, update=False):
cherrypy.engine.exit()
SCHED.shutdown(wait=False)
# Clear any sessions in the db - Not sure yet if we should do this. More testing required
# logger.debug(u'Clearing Plex sessions.')
# monitor.drop_session_db()
CONFIG.write()
if not restart and not update:

213
plexpy/activity_handler.py Normal file
View file

@ -0,0 +1,213 @@
# This file is part of PlexPy.
#
# PlexPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PlexPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
import time
import plexpy
from plexpy import logger, pmsconnect, activity_processor, threading, notification_handler
class ActivityHandler(object):
def __init__(self, timeline):
self.timeline = timeline
# print timeline
def is_valid_session(self):
if 'sessionKey' in self.timeline:
if str(self.timeline['sessionKey']).isdigit():
return True
return False
def get_session_key(self):
if self.is_valid_session():
return int(self.timeline['sessionKey'])
return None
def get_live_session(self):
pms_connect = pmsconnect.PmsConnect()
session_list = pms_connect.get_current_activity()
for session in session_list['sessions']:
if int(session['session_key']) == self.get_session_key():
return session
return None
def update_db_session(self):
# Update our session temp table values
monitor_proc = activity_processor.ActivityProcessor()
monitor_proc.write_session(self.get_live_session())
def on_start(self):
if self.is_valid_session():
logger.debug(u"PlexPy ActivityHandler :: Session %s has started." % str(self.get_session_key()))
# Fire off notifications
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=self.get_live_session(), notify_action='play')).start()
# Write the new session to our temp session table
self.update_db_session()
def on_stop(self, force_stop=False):
if self.is_valid_session():
logger.debug(u"PlexPy ActivityHandler :: Session %s has stopped." % str(self.get_session_key()))
# Set the session last_paused timestamp
ap = activity_processor.ActivityProcessor()
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
# Update the session state and viewOffset
# Set force_stop to true to disable the state set
if not force_stop:
ap.set_session_state(session_key=self.get_session_key(),
state=self.timeline['state'],
view_offset=self.timeline['viewOffset'])
# Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key())
# Fire off notifications
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=db_session, notify_action='stop')).start()
# Write it to the history table
monitor_proc = activity_processor.ActivityProcessor()
monitor_proc.write_session_history(session=db_session)
# Remove the session from our temp session table
ap.delete_session(session_key=self.get_session_key())
def on_pause(self):
if self.is_valid_session():
logger.debug(u"PlexPy ActivityHandler :: Session %s has been paused." % str(self.get_session_key()))
# Set the session last_paused timestamp
ap = activity_processor.ActivityProcessor()
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=int(time.time()))
# Update the session state and viewOffset
ap.set_session_state(session_key=self.get_session_key(),
state=self.timeline['state'],
view_offset=self.timeline['viewOffset'])
# Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key())
# Fire off notifications
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=db_session, notify_action='pause')).start()
def on_resume(self):
if self.is_valid_session():
logger.debug(u"PlexPy ActivityHandler :: Session %s has been resumed." % str(self.get_session_key()))
# Set the session last_paused timestamp
ap = activity_processor.ActivityProcessor()
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
# Update the session state and viewOffset
ap.set_session_state(session_key=self.get_session_key(),
state=self.timeline['state'],
view_offset=self.timeline['viewOffset'])
# Retrieve the session data from our temp table
db_session = ap.get_session_by_key(session_key=self.get_session_key())
# Fire off notifications
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=db_session, notify_action='resume')).start()
def on_buffer(self):
if self.is_valid_session():
logger.debug(u"PlexPy ActivityHandler :: Session %s is buffering." % self.get_session_key())
ap = activity_processor.ActivityProcessor()
db_stream = ap.get_session_by_key(session_key=self.get_session_key())
# Increment our buffer count
ap.increment_session_buffer_count(session_key=self.get_session_key())
# Get our current buffer count
current_buffer_count = ap.get_session_buffer_count(self.get_session_key())
logger.debug(u"PlexPy ActivityHandler :: Session %s buffer count is %s." %
(self.get_session_key(), current_buffer_count))
# Get our last triggered time
buffer_last_triggered = ap.get_session_buffer_trigger_time(self.get_session_key())
time_since_last_trigger = 0
if buffer_last_triggered:
logger.debug(u"PlexPy ActivityHandler :: Session %s buffer last triggered at %s." %
(self.get_session_key(), buffer_last_triggered))
time_since_last_trigger = int(time.time()) - int(buffer_last_triggered)
if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \
time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT:
ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=db_stream, notify_action='buffer')).start()
# This function receives events from our websocket connection
def process(self):
if self.is_valid_session():
from plexpy import helpers
ap = activity_processor.ActivityProcessor()
db_session = ap.get_session_by_key(session_key=self.get_session_key())
# If we already have this session in the temp table, check for state changes
if db_session:
this_state = self.timeline['state']
last_state = db_session['state']
this_key = str(self.timeline['ratingKey'])
last_key = str(db_session['rating_key'])
# Make sure the same item is being played
if this_key == last_key:
# Update the session state and viewOffset
if this_state == 'playing':
ap.set_session_state(session_key=self.get_session_key(),
state=this_state,
view_offset=self.timeline['viewOffset'])
# Start our state checks
if this_state != last_state:
if this_state == 'paused':
self.on_pause()
elif last_state == 'paused' and this_state == 'playing':
self.on_resume()
elif this_state == 'stopped':
self.on_stop()
elif this_state == 'buffering':
self.on_buffer()
# If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed
else:
# Manually stop and start
# Set force_stop so that we don't overwrite our last viewOffset
self.on_stop(force_stop=True)
self.on_start()
# Monitor if the stream has reached the watch percentage for notifications
# The only purpose of this is for notifications
progress_percent = helpers.get_percent(self.timeline['viewOffset'], db_session['duration'])
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT and this_state != 'buffering':
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=db_session, notify_action='watched')).start()
else:
# We don't have this session in our table yet, start a new one.
self.on_start()

164
plexpy/activity_pinger.py Normal file
View file

@ -0,0 +1,164 @@
# This file is part of PlexPy.
#
# PlexPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PlexPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
from plexpy import logger, pmsconnect, notification_handler, database, helpers, activity_processor
import threading
import plexpy
import time
monitor_lock = threading.Lock()
def check_active_sessions(ws_request=False):
with monitor_lock:
pms_connect = pmsconnect.PmsConnect()
session_list = pms_connect.get_current_activity()
monitor_db = database.MonitorDatabase()
monitor_process = activity_processor.ActivityProcessor()
# logger.debug(u"PlexPy Monitor :: Checking for active streams.")
if session_list:
media_container = session_list['sessions']
# Check our temp table for what we must do with the new streams
db_streams = monitor_db.select('SELECT started, session_key, rating_key, media_type, title, parent_title, '
'grandparent_title, user_id, user, friendly_name, ip_address, player, '
'platform, machine_id, parent_rating_key, grandparent_rating_key, state, '
'view_offset, duration, video_decision, audio_decision, width, height, '
'container, video_codec, audio_codec, bitrate, video_resolution, '
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, '
'transcode_container, transcode_video_codec, transcode_audio_codec, '
'transcode_audio_channels, transcode_width, transcode_height, '
'paused_counter, last_paused '
'FROM sessions')
for stream in db_streams:
if any(d['session_key'] == str(stream['session_key']) and d['rating_key'] == str(stream['rating_key'])
for d in media_container):
# The user's session is still active
for session in media_container:
if session['session_key'] == str(stream['session_key']) and \
session['rating_key'] == str(stream['rating_key']):
# The user is still playing the same media item
# Here we can check the play states
if session['state'] != stream['state']:
if session['state'] == 'paused':
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='pause')).start()
if session['state'] == 'playing' and stream['state'] == 'paused':
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='resume')).start()
if stream['state'] == 'paused' and not ws_request:
# The stream is still paused so we need to increment the paused_counter
# Using the set config parameter as the interval, probably not the most accurate but
# it will have to do for now. If it's a websocket request don't use this method.
paused_counter = int(stream['paused_counter']) + plexpy.CONFIG.MONITORING_INTERVAL
monitor_db.action('UPDATE sessions SET paused_counter = ? '
'WHERE session_key = ? AND rating_key = ?',
[paused_counter, stream['session_key'], stream['rating_key']])
if session['state'] == 'buffering' and plexpy.CONFIG.BUFFER_THRESHOLD > 0:
# The stream is buffering so we need to increment the buffer_count
# We're going just increment on every monitor ping,
# would be difficult to keep track otherwise
monitor_db.action('UPDATE sessions SET buffer_count = buffer_count + 1 '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
# Check the current buffer count and last buffer to determine if we should notify
buffer_values = monitor_db.select('SELECT buffer_count, buffer_last_triggered '
'FROM sessions '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
if buffer_values[0]['buffer_count'] >= plexpy.CONFIG.BUFFER_THRESHOLD:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
# Our first buffer notification
if buffer_values[0]['buffer_count'] == plexpy.CONFIG.BUFFER_THRESHOLD:
logger.info(u"PlexPy Monitor :: User '%s' has triggered a buffer warning."
% stream['user'])
# Set the buffer trigger time
monitor_db.action('UPDATE sessions '
'SET buffer_last_triggered = strftime("%s","now") '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
else:
# Subsequent buffer notifications after wait time
if int(time.time()) > buffer_values[0]['buffer_last_triggered'] + \
plexpy.CONFIG.BUFFER_WAIT:
logger.info(u"PlexPy Monitor :: User '%s' has triggered multiple buffer warnings."
% stream['user'])
# Set the buffer trigger time
monitor_db.action('UPDATE sessions '
'SET buffer_last_triggered = strftime("%s","now") '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
logger.debug(u"PlexPy Monitor :: Stream buffering. Count is now %s. Last triggered %s."
% (buffer_values[0][0], buffer_values[0][1]))
# Check if the user has reached the offset in the media we defined as the "watched" percent
# Don't trigger if state is buffer as some clients push the progress to the end when
# buffering on start.
if session['view_offset'] and session['duration'] and session['state'] != 'buffering':
if helpers.get_percent(session['view_offset'],
session['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='watched')).start()
else:
# The user has stopped playing a stream
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
% (stream['session_key'], stream['rating_key']))
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
# Check if the user has reached the offset in the media we defined as the "watched" percent
if stream['view_offset'] and stream['duration']:
if helpers.get_percent(stream['view_offset'],
stream['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='watched')).start()
# Push any notifications - Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='stop')).start()
# Write the item history on playback stop
monitor_process.write_session_history(session=stream)
# Process the newly received session data
for session in media_container:
monitor_process.write_session(session)
else:
logger.debug(u"PlexPy Monitor :: Unable to read session list.")

View file

@ -13,154 +13,15 @@
# You should have received a copy of the GNU General Public License
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
from plexpy import logger, pmsconnect, notification_handler, log_reader, common, database, helpers
from plexpy import logger, pmsconnect, notification_handler, log_reader, database
import threading
import plexpy
import re
import time
monitor_lock = threading.Lock()
def check_active_sessions():
with monitor_lock:
pms_connect = pmsconnect.PmsConnect()
session_list = pms_connect.get_current_activity()
monitor_db = database.MonitorDatabase()
monitor_process = MonitorProcessing()
# logger.debug(u"PlexPy Monitor :: Checking for active streams.")
if session_list:
media_container = session_list['sessions']
# Check our temp table for what we must do with the new streams
db_streams = monitor_db.select('SELECT started, session_key, rating_key, media_type, title, parent_title, '
'grandparent_title, user_id, user, friendly_name, ip_address, player, '
'platform, machine_id, parent_rating_key, grandparent_rating_key, state, '
'view_offset, duration, video_decision, audio_decision, width, height, '
'container, video_codec, audio_codec, bitrate, video_resolution, '
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, '
'transcode_container, transcode_video_codec, transcode_audio_codec, '
'transcode_audio_channels, transcode_width, transcode_height, paused_counter '
'FROM sessions')
for stream in db_streams:
if any(d['session_key'] == str(stream['session_key']) and d['rating_key'] == str(stream['rating_key'])
for d in media_container):
# The user's session is still active
for session in media_container:
if session['session_key'] == str(stream['session_key']) and \
session['rating_key'] == str(stream['rating_key']):
# The user is still playing the same media item
# Here we can check the play states
if session['state'] != stream['state']:
if session['state'] == 'paused':
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='pause')).start()
if session['state'] == 'playing' and stream['state'] == 'paused':
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='resume')).start()
if stream['state'] == 'paused':
# The stream is still paused so we need to increment the paused_counter
# Using the set config parameter as the interval, probably not the most accurate but
# it will have to do for now.
paused_counter = int(stream['paused_counter']) + plexpy.CONFIG.MONITORING_INTERVAL
monitor_db.action('UPDATE sessions SET paused_counter = ? '
'WHERE session_key = ? AND rating_key = ?',
[paused_counter, stream['session_key'], stream['rating_key']])
if session['state'] == 'buffering' and plexpy.CONFIG.BUFFER_THRESHOLD > 0:
# The stream is buffering so we need to increment the buffer_count
# We're going just increment on every monitor ping,
# would be difficult to keep track otherwise
monitor_db.action('UPDATE sessions SET buffer_count = buffer_count + 1 '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
# Check the current buffer count and last buffer to determine if we should notify
buffer_values = monitor_db.select('SELECT buffer_count, buffer_last_triggered '
'FROM sessions '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
if buffer_values[0]['buffer_count'] >= plexpy.CONFIG.BUFFER_THRESHOLD:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
# Our first buffer notification
if buffer_values[0]['buffer_count'] == plexpy.CONFIG.BUFFER_THRESHOLD:
logger.info(u"PlexPy Monitor :: User '%s' has triggered a buffer warning."
% stream['user'])
# Set the buffer trigger time
monitor_db.action('UPDATE sessions '
'SET buffer_last_triggered = strftime("%s","now") '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
else:
# Subsequent buffer notifications after wait time
if int(time.time()) > buffer_values[0]['buffer_last_triggered'] + \
plexpy.CONFIG.BUFFER_WAIT:
logger.info(u"PlexPy Monitor :: User '%s' has triggered multiple buffer warnings."
% stream['user'])
# Set the buffer trigger time
monitor_db.action('UPDATE sessions '
'SET buffer_last_triggered = strftime("%s","now") '
'WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='buffer')).start()
logger.debug(u"PlexPy Monitor :: Stream buffering. Count is now %s. Last triggered %s."
% (buffer_values[0][0], buffer_values[0][1]))
# Check if the user has reached the offset in the media we defined as the "watched" percent
# Don't trigger if state is buffer as some clients push the progress to the end when
# buffering on start.
if session['progress'] and session['duration'] and session['state'] != 'buffering':
if helpers.get_percent(session['progress'],
session['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='watched')).start()
else:
# The user has stopped playing a stream
logger.debug(u"PlexPy Monitor :: Removing sessionKey %s ratingKey %s from session queue"
% (stream['session_key'], stream['rating_key']))
monitor_db.action('DELETE FROM sessions WHERE session_key = ? AND rating_key = ?',
[stream['session_key'], stream['rating_key']])
# Check if the user has reached the offset in the media we defined as the "watched" percent
if stream['view_offset'] and stream['duration']:
if helpers.get_percent(stream['view_offset'],
stream['duration']) > plexpy.CONFIG.NOTIFY_WATCHED_PERCENT:
# Push any notifications -
# Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='watched')).start()
# Push any notifications - Push it on it's own thread so we don't hold up our db actions
threading.Thread(target=notification_handler.notify,
kwargs=dict(stream_data=stream, notify_action='stop')).start()
# Write the item history on playback stop
monitor_process.write_session_history(session=stream)
# Process the newly received session data
for session in media_container:
monitor_process.write_session(session)
else:
logger.debug(u"PlexPy Monitor :: Unable to read session list.")
class MonitorProcessing(object):
class ActivityProcessor(object):
def __init__(self):
self.db = database.MonitorDatabase()
@ -169,7 +30,7 @@ class MonitorProcessing(object):
values = {'session_key': session['session_key'],
'rating_key': session['rating_key'],
'media_type': session['type'],
'media_type': session['media_type'],
'state': session['state'],
'user_id': session['user_id'],
'user': session['user'],
@ -182,7 +43,7 @@ class MonitorProcessing(object):
'platform': session['platform'],
'parent_rating_key': session['parent_rating_key'],
'grandparent_rating_key': session['grandparent_rating_key'],
'view_offset': session['progress'],
'view_offset': session['view_offset'],
'duration': session['duration'],
'video_decision': session['video_decision'],
'audio_decision': session['audio_decision'],
@ -254,7 +115,7 @@ class MonitorProcessing(object):
session['media_type'] == 'track':
logging_enabled = True
else:
logger.debug(u"PlexPy Monitor :: ratingKey %s not logged. Does not meet logging criteria. "
logger.debug(u"PlexPy ActivityProcessor :: ratingKey %s not logged. Does not meet logging criteria. "
u"Media type is '%s'" % (session['rating_key'], session['media_type']))
if str(session['paused_counter']).isdigit():
@ -266,24 +127,24 @@ class MonitorProcessing(object):
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
(real_play_time < int(plexpy.CONFIG.LOGGING_IGNORE_INTERVAL)):
logging_enabled = False
logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s "
logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s "
u"seconds, so we're not logging it." %
(session['rating_key'], str(real_play_time), plexpy.CONFIG.LOGGING_IGNORE_INTERVAL))
elif is_import and import_ignore_interval:
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
(real_play_time < int(import_ignore_interval)):
logging_enabled = False
logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s "
logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s "
u"seconds, so we're not logging it." %
(session['rating_key'], str(real_play_time),
import_ignore_interval))
if not user_details['keep_history'] and not is_import:
logging_enabled = False
logger.debug(u"PlexPy Monitor :: History logging for user '%s' is disabled." % session['user'])
logger.debug(u"PlexPy ActivityProcessor :: History logging for user '%s' is disabled." % session['user'])
if logging_enabled:
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history table...")
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...")
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
'platform, machine_id, view_offset) VALUES ' \
@ -294,14 +155,14 @@ class MonitorProcessing(object):
session['ip_address'], session['paused_counter'], session['player'], session['platform'],
session['machine_id'], session['view_offset']]
# logger.debug(u"PlexPy Monitor :: Writing session_history transaction...")
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history transaction...")
self.db.action(query=query, args=args)
# logger.debug(u"PlexPy Monitor :: Successfully written history item, last id for session_history is %s"
# logger.debug(u"PlexPy ActivityProcessor :: Successfully written history item, last id for session_history is %s"
# % last_id)
# Write the session_history_media_info table
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_media_info table...")
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_media_info table...")
query = 'INSERT INTO session_history_media_info (id, rating_key, video_decision, audio_decision, ' \
'duration, width, height, container, video_codec, audio_codec, bitrate, video_resolution, ' \
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, transcode_container, ' \
@ -317,11 +178,11 @@ class MonitorProcessing(object):
session['transcode_video_codec'], session['transcode_audio_codec'],
session['transcode_audio_channels'], session['transcode_width'], session['transcode_height']]
# logger.debug(u"PlexPy Monitor :: Writing session_history_media_info transaction...")
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...")
self.db.action(query=query, args=args)
if not is_import:
logger.debug(u"PlexPy Monitor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
pms_connect = pmsconnect.PmsConnect()
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
metadata = result['metadata']
@ -342,7 +203,7 @@ class MonitorProcessing(object):
else:
full_title = metadata['title']
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_metadata table...")
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_metadata table...")
query = 'INSERT INTO session_history_metadata (id, rating_key, parent_rating_key, ' \
'grandparent_rating_key, title, parent_title, grandparent_title, full_title, media_index, ' \
'parent_media_index, thumb, parent_thumb, grandparent_thumb, art, media_type, year, ' \
@ -359,12 +220,12 @@ class MonitorProcessing(object):
metadata['last_viewed_at'], metadata['content_rating'], metadata['summary'], metadata['tagline'],
metadata['rating'], metadata['duration'], metadata['guid'], directors, writers, actors, genres, metadata['studio']]
# logger.debug(u"PlexPy Monitor :: Writing session_history_metadata transaction...")
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...")
self.db.action(query=query, args=args)
def find_session_ip(self, rating_key=None, machine_id=None):
logger.debug(u"PlexPy Monitor :: Requesting log lines...")
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
rating_key_line = 'ratingKey=' + rating_key
@ -380,18 +241,18 @@ class MonitorProcessing(object):
if ipv4:
# The logged IP will always be the first match and we don't want localhost entries
if ipv4[0] != '127.0.0.1':
logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s "
logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s "
u"and machineIdentifier %s."
% (ipv4[0], rating_key, machine_id))
return ipv4[0]
logger.debug(u"PlexPy Monitor :: Unable to find IP address on first pass. "
logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on first pass. "
u"Attempting fallback check in 5 seconds...")
# Wait for the log to catch up and read in new lines
time.sleep(5)
logger.debug(u"PlexPy Monitor :: Requesting log lines...")
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
for line in reversed(log_lines):
@ -403,10 +264,100 @@ class MonitorProcessing(object):
if ipv4:
# The logged IP will always be the first match and we don't want localhost entries
if ipv4[0] != '127.0.0.1':
logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s." %
logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s." %
(ipv4[0], rating_key))
return ipv4[0]
logger.debug(u"PlexPy Monitor :: Unable to find IP address on fallback search. Not logging IP address.")
logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on fallback search. Not logging IP address.")
return None
def get_session_by_key(self, session_key=None):
if str(session_key).isdigit():
result = self.db.select('SELECT started, session_key, rating_key, media_type, title, parent_title, '
'grandparent_title, user_id, user, friendly_name, ip_address, player, '
'platform, machine_id, parent_rating_key, grandparent_rating_key, state, '
'view_offset, duration, video_decision, audio_decision, width, height, '
'container, video_codec, audio_codec, bitrate, video_resolution, '
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, '
'transcode_container, transcode_video_codec, transcode_audio_codec, '
'transcode_audio_channels, transcode_width, transcode_height, '
'paused_counter, last_paused '
'FROM sessions WHERE session_key = ? LIMIT 1', args=[session_key])
for session in result:
if session:
return session
return None
def set_session_state(self, session_key=None, state=None, view_offset=0):
if str(session_key).isdigit() and str(view_offset).isdigit():
values = {'view_offset': int(view_offset)}
if state:
values['state'] = state
keys = {'session_key': session_key}
result = self.db.upsert('sessions', values, keys)
return result
return None
def delete_session(self, session_key=None):
if str(session_key).isdigit():
self.db.action('DELETE FROM sessions WHERE session_key = ?', [session_key])
def set_session_last_paused(self, session_key=None, timestamp=None):
if str(session_key).isdigit():
result = self.db.select('SELECT last_paused, paused_counter '
'FROM sessions '
'WHERE session_key = ?', args=[session_key])
paused_counter = None
for session in result:
if session['last_paused']:
paused_offset = int(time.time()) - int(session['last_paused'])
paused_counter = int(session['paused_counter']) + int(paused_offset)
values = {'state': 'playing',
'last_paused': timestamp
}
if paused_counter:
values['paused_counter'] = paused_counter
keys = {'session_key': session_key}
self.db.upsert('sessions', values, keys)
def increment_session_buffer_count(self, session_key=None):
if str(session_key).isdigit():
self.db.action('UPDATE sessions SET buffer_count = buffer_count + 1 '
'WHERE session_key = ?',
[session_key])
def get_session_buffer_count(self, session_key=None):
if str(session_key).isdigit():
buffer_count = self.db.select_single('SELECT buffer_count '
'FROM sessions '
'WHERE session_key = ?',
[session_key])
if buffer_count:
return buffer_count
return 0
def set_session_buffer_trigger_time(self, session_key=None):
if str(session_key).isdigit():
self.db.action('UPDATE sessions SET buffer_last_triggered = strftime("%s","now") '
'WHERE session_key = ?',
[session_key])
def get_session_buffer_trigger_time(self, session_key=None):
if str(session_key).isdigit():
last_time = self.db.select_single('SELECT buffer_last_triggered '
'FROM sessions '
'WHERE session_key = ?',
[session_key])
if last_time:
return last_time
return None

View file

@ -111,6 +111,7 @@ _CONFIG_DEFINITIONS = {
'MUSIC_NOTIFY_ON_PAUSE': (int, 'Monitoring', 0),
'MUSIC_LOGGING_ENABLE': (int, 'Monitoring', 0),
'MONITORING_INTERVAL': (int, 'Monitoring', 60),
'MONITORING_USE_WEBSOCKET': (int, 'Monitoring', 0),
'NMA_APIKEY': (str, 'NMA', ''),
'NMA_ENABLED': (int, 'NMA', 0),
'NMA_PRIORITY': (int, 'NMA', 0),

View file

@ -778,3 +778,4 @@ class DataFactory(object):
return 'Deleted all items for user_id %s.' % user_id
else:
return 'Unable to delete items. Input user_id not valid.'

View file

@ -705,9 +705,9 @@ class PmsConnect(object):
'transcode_container': transcode_container,
'transcode_protocol': transcode_protocol,
'duration': duration,
'progress': progress,
'view_offset': progress,
'progress_percent': str(helpers.get_percent(progress, duration)),
'type': 'track',
'media_type': 'track',
'indexes': 0
}
@ -826,14 +826,14 @@ class PmsConnect(object):
'transcode_container': transcode_container,
'transcode_protocol': transcode_protocol,
'duration': duration,
'progress': progress,
'view_offset': progress,
'progress_percent': str(helpers.get_percent(progress, duration)),
'indexes': use_indexes
}
if helpers.get_xml_attr(session, 'ratingKey').isdigit():
session_output['type'] = helpers.get_xml_attr(session, 'type')
session_output['media_type'] = helpers.get_xml_attr(session, 'type')
else:
session_output['type'] = 'clip'
session_output['media_type'] = 'clip'
elif helpers.get_xml_attr(session, 'type') == 'movie':
session_output = {'session_key': helpers.get_xml_attr(session, 'sessionKey'),
@ -882,14 +882,14 @@ class PmsConnect(object):
'transcode_container': transcode_container,
'transcode_protocol': transcode_protocol,
'duration': duration,
'progress': progress,
'view_offset': progress,
'progress_percent': str(helpers.get_percent(progress, duration)),
'indexes': use_indexes
}
if helpers.get_xml_attr(session, 'ratingKey').isdigit():
session_output['type'] = helpers.get_xml_attr(session, 'type')
session_output['media_type'] = helpers.get_xml_attr(session, 'type')
else:
session_output['type'] = 'clip'
session_output['media_type'] = 'clip'
elif helpers.get_xml_attr(session, 'type') == 'clip':
session_output = {'session_key': helpers.get_xml_attr(session, 'sessionKey'),
@ -938,9 +938,9 @@ class PmsConnect(object):
'transcode_container': transcode_container,
'transcode_protocol': transcode_protocol,
'duration': duration,
'progress': progress,
'view_offset': progress,
'progress_percent': str(helpers.get_percent(progress, duration)),
'type': helpers.get_xml_attr(session, 'type'),
'media_type': helpers.get_xml_attr(session, 'type'),
'indexes': 0
}
@ -1027,9 +1027,9 @@ class PmsConnect(object):
'transcode_container': transcode_container,
'transcode_protocol': transcode_protocol,
'duration': '',
'progress': '',
'view_offset': '',
'progress_percent': '100',
'type': 'photo',
'media_type': 'photo',
'indexes': 0
}
@ -1083,7 +1083,6 @@ class PmsConnect(object):
}
children_list.append(children_output)
output = {'children_count': helpers.get_xml_attr(xml_head[0], 'size'),
'children_type': helpers.get_xml_attr(xml_head[0], 'viewGroup'),
'title': helpers.get_xml_attr(xml_head[0], 'title2'),

142
plexpy/web_socket.py Normal file
View file

@ -0,0 +1,142 @@
# This file is part of PlexPy.
#
# PlexPy is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# PlexPy is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
# Mostly borrowed from https://github.com/trakt/Plex-Trakt-Scrobbler
from plexpy import logger, activity_pinger
import threading
import plexpy
import json
import time
import websocket
name = 'websocket'
opcode_data = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
def start_thread():
# Check for any existing sessions on start up
activity_pinger.check_active_sessions(ws_request=True)
# Start the websocket listener on it's own thread
threading.Thread(target=run).start()
def run():
from websocket import create_connection
uri = 'ws://%s:%s/:/websockets/notifications' % (
plexpy.CONFIG.PMS_IP,
plexpy.CONFIG.PMS_PORT
)
# Set authentication token (if one is available)
if plexpy.CONFIG.PMS_TOKEN:
uri += '?X-Plex-Token=' + plexpy.CONFIG.PMS_TOKEN
ws_connected = False
reconnects = 0
# Try an open the websocket connection - if it fails after 5 retries fallback to polling
while not ws_connected and reconnects < 5:
try:
logger.info(u'PlexPy WebSocket :: Opening websocket, connection attempt %s.' % str(reconnects + 1))
ws = create_connection(uri)
reconnects = 0
ws_connected = True
logger.info(u'PlexPy WebSocket :: Ready')
except IOError, e:
logger.error(u'PlexPy WebSocket :: %s.' % e)
reconnects += 1
time.sleep(5)
while ws_connected:
try:
process(*receive(ws))
# successfully received data, reset reconnects counter
reconnects = 0
except websocket.WebSocketConnectionClosedException:
if reconnects <= 5:
reconnects += 1
# Increasing sleep interval between reconnections
if reconnects > 1:
time.sleep(2 * (reconnects - 1))
logger.warn(u'PlexPy WebSocket :: Connection has closed, reconnecting...')
try:
ws = create_connection(uri)
except IOError, e:
logger.info(u'PlexPy WebSocket :: %s.' % e)
else:
ws_connected = False
break
if not ws_connected:
logger.error(u'PlexPy WebSocket :: Connection unavailable, falling back to polling.')
plexpy.POLLING_FAILOVER = True
plexpy.initialize_scheduler()
logger.debug(u'Leaving thread.')
def receive(ws):
frame = ws.recv_frame()
if not frame:
raise websocket.WebSocketException("Not a valid frame %s" % frame)
elif frame.opcode in opcode_data:
return frame.opcode, frame.data
elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
ws.send_close()
return frame.opcode, None
elif frame.opcode == websocket.ABNF.OPCODE_PING:
ws.pong("Hi!")
return None, None
def process(opcode, data):
from plexpy import activity_handler
if opcode not in opcode_data:
return False
try:
info = json.loads(data)
except Exception as ex:
logger.warn(u'PlexPy WebSocket :: Error decoding message from websocket: %s' % ex)
logger.debug(data)
return False
type = info.get('type')
if not type:
return False
if type == 'playing':
# logger.debug('%s.playing %s' % (name, info))
try:
time_line = info.get('_children')
except:
logger.debug(u"PlexPy WebSocket :: Session found but unable to get timeline data.")
return False
activity = activity_handler.ActivityHandler(timeline=time_line[0])
activity.process()
return True

View file

@ -430,6 +430,7 @@ class WebInterface(object):
"movie_notify_on_pause": checked(plexpy.CONFIG.MOVIE_NOTIFY_ON_PAUSE),
"music_notify_on_pause": checked(plexpy.CONFIG.MUSIC_NOTIFY_ON_PAUSE),
"monitoring_interval": plexpy.CONFIG.MONITORING_INTERVAL,
"monitoring_use_websocket": checked(plexpy.CONFIG.MONITORING_USE_WEBSOCKET),
"refresh_users_interval": plexpy.CONFIG.REFRESH_USERS_INTERVAL,
"refresh_users_on_startup": checked(plexpy.CONFIG.REFRESH_USERS_ON_STARTUP),
"ip_logging_enable": checked(plexpy.CONFIG.IP_LOGGING_ENABLE),
@ -468,7 +469,7 @@ class WebInterface(object):
checked_configs = [
"launch_browser", "enable_https", "api_enabled", "freeze_db", "check_github",
"grouping_global_history", "grouping_user_history", "grouping_charts", "pms_use_bif", "pms_ssl",
"tv_notify_enable", "movie_notify_enable", "music_notify_enable",
"tv_notify_enable", "movie_notify_enable", "music_notify_enable", "monitoring_use_websocket",
"tv_notify_on_start", "movie_notify_on_start", "music_notify_on_start",
"tv_notify_on_stop", "movie_notify_on_stop", "music_notify_on_stop",
"tv_notify_on_pause", "movie_notify_on_pause", "music_notify_on_pause", "refresh_users_on_startup",