mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-11 07:46:07 -07:00
Get buffer and watched notifications working again.
This commit is contained in:
parent
5595ef2e20
commit
74e8d7d329
3 changed files with 97 additions and 28 deletions
|
@ -395,7 +395,7 @@ available_notification_agents = notifiers.available_notification_agents()
|
||||||
<label>
|
<label>
|
||||||
<input type="checkbox" id="monitoring_use_websocket" name="monitoring_use_websocket" value="1" ${config['monitoring_use_websocket']}> Use Websocket (requires restart)
|
<input type="checkbox" id="monitoring_use_websocket" name="monitoring_use_websocket" value="1" ${config['monitoring_use_websocket']}> Use Websocket (requires restart)
|
||||||
</label>
|
</label>
|
||||||
<p class="help-block">Instead of polling the server at regular intervals let the server tell us when something happens.</p>
|
<p class="help-block">Instead of polling the server at regular intervals let the server tell us when something happens. This is currently experimental.</p>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
<div class="padded-header">
|
<div class="padded-header">
|
||||||
|
|
|
@ -14,6 +14,8 @@
|
||||||
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
|
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
import plexpy
|
||||||
|
|
||||||
from plexpy import logger, pmsconnect, activity_processor, threading, notification_handler
|
from plexpy import logger, pmsconnect, activity_processor, threading, notification_handler
|
||||||
|
|
||||||
|
|
||||||
|
@ -88,9 +90,6 @@ class ActivityHandler(object):
|
||||||
# Remove the session from our temp session table
|
# Remove the session from our temp session table
|
||||||
ap.delete_session(session_key=self.get_session_key())
|
ap.delete_session(session_key=self.get_session_key())
|
||||||
|
|
||||||
def on_buffer(self):
|
|
||||||
pass
|
|
||||||
|
|
||||||
def on_pause(self):
|
def on_pause(self):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
logger.debug(u"PlexPy ActivityHandler :: Session %s has been paused." % str(self.get_session_key()))
|
logger.debug(u"PlexPy ActivityHandler :: Session %s has been paused." % str(self.get_session_key()))
|
||||||
|
@ -131,9 +130,40 @@ class ActivityHandler(object):
|
||||||
threading.Thread(target=notification_handler.notify,
|
threading.Thread(target=notification_handler.notify,
|
||||||
kwargs=dict(stream_data=db_session, notify_action='resume')).start()
|
kwargs=dict(stream_data=db_session, notify_action='resume')).start()
|
||||||
|
|
||||||
|
def on_buffer(self):
|
||||||
|
if self.is_valid_session():
|
||||||
|
logger.debug(u"PlexPy ActivityHandler :: Session %s is buffering." % self.get_session_key())
|
||||||
|
ap = activity_processor.ActivityProcessor()
|
||||||
|
db_stream = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
|
# Increment our buffer count
|
||||||
|
ap.increment_session_buffer_count(session_key=self.get_session_key())
|
||||||
|
|
||||||
|
# Get our current buffer count
|
||||||
|
current_buffer_count = ap.get_session_buffer_count(self.get_session_key())
|
||||||
|
logger.debug(u"PlexPy ActivityHandler :: Session %s buffer count is %s." %
|
||||||
|
(self.get_session_key(), current_buffer_count))
|
||||||
|
|
||||||
|
# Get our last triggered time
|
||||||
|
buffer_last_triggered = ap.get_session_buffer_trigger_time(self.get_session_key())
|
||||||
|
|
||||||
|
time_since_last_trigger = 0
|
||||||
|
if buffer_last_triggered:
|
||||||
|
logger.debug(u"PlexPy ActivityHandler :: Session %s buffer last triggered at %s." %
|
||||||
|
(self.get_session_key(), buffer_last_triggered))
|
||||||
|
time_since_last_trigger = int(time.time()) - int(buffer_last_triggered)
|
||||||
|
|
||||||
|
if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \
|
||||||
|
time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT:
|
||||||
|
ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
|
||||||
|
threading.Thread(target=notification_handler.notify,
|
||||||
|
kwargs=dict(stream_data=db_stream, notify_action='buffer')).start()
|
||||||
|
|
||||||
# This function receives events from our websocket connection
|
# This function receives events from our websocket connection
|
||||||
def process(self):
|
def process(self):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
|
from plexpy import helpers
|
||||||
|
|
||||||
ap = activity_processor.ActivityProcessor()
|
ap = activity_processor.ActivityProcessor()
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
|
@ -142,19 +172,24 @@ class ActivityHandler(object):
|
||||||
this_state = self.timeline['state']
|
this_state = self.timeline['state']
|
||||||
last_state = db_session['state']
|
last_state = db_session['state']
|
||||||
|
|
||||||
|
# Start our state checks
|
||||||
if this_state != last_state:
|
if this_state != last_state:
|
||||||
# logger.debug(u"PlexPy ActivityHandler :: Last state %s :: Current state %s" %
|
|
||||||
# (last_state, this_state))
|
|
||||||
if this_state == 'paused':
|
if this_state == 'paused':
|
||||||
self.on_pause()
|
self.on_pause()
|
||||||
elif last_state == 'paused' and this_state == 'playing':
|
elif last_state == 'paused' and this_state == 'playing':
|
||||||
self.on_resume()
|
self.on_resume()
|
||||||
elif this_state == 'stopped':
|
elif this_state == 'stopped':
|
||||||
self.on_stop()
|
self.on_stop()
|
||||||
# else:
|
elif this_state == 'buffering':
|
||||||
# logger.debug(u"PlexPy ActivityHandler :: Session %s state has not changed." %
|
self.on_buffer()
|
||||||
# self.get_session_key())
|
|
||||||
|
# Monitor if the stream has reached the watch percentage for notifications
|
||||||
|
# The only purpose of this is for notifications
|
||||||
|
progress_percent = helpers.get_percent(self.timeline['viewOffset'], db_session['duration'])
|
||||||
|
if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT and this_state != 'buffering':
|
||||||
|
threading.Thread(target=notification_handler.notify,
|
||||||
|
kwargs=dict(stream_data=db_session, notify_action='watched')).start()
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# We don't have this session in our table yet, start a new one.
|
# We don't have this session in our table yet, start a new one.
|
||||||
# logger.debug(u"PlexPy ActivityHandler :: Session %s has started." % self.get_session_key())
|
|
||||||
self.on_start()
|
self.on_start()
|
|
@ -115,7 +115,7 @@ class ActivityProcessor(object):
|
||||||
session['media_type'] == 'track':
|
session['media_type'] == 'track':
|
||||||
logging_enabled = True
|
logging_enabled = True
|
||||||
else:
|
else:
|
||||||
logger.debug(u"PlexPy Monitor :: ratingKey %s not logged. Does not meet logging criteria. "
|
logger.debug(u"PlexPy ActivityProcessor :: ratingKey %s not logged. Does not meet logging criteria. "
|
||||||
u"Media type is '%s'" % (session['rating_key'], session['media_type']))
|
u"Media type is '%s'" % (session['rating_key'], session['media_type']))
|
||||||
|
|
||||||
if str(session['paused_counter']).isdigit():
|
if str(session['paused_counter']).isdigit():
|
||||||
|
@ -127,24 +127,24 @@ class ActivityProcessor(object):
|
||||||
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
|
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
|
||||||
(real_play_time < int(plexpy.CONFIG.LOGGING_IGNORE_INTERVAL)):
|
(real_play_time < int(plexpy.CONFIG.LOGGING_IGNORE_INTERVAL)):
|
||||||
logging_enabled = False
|
logging_enabled = False
|
||||||
logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s "
|
logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s "
|
||||||
u"seconds, so we're not logging it." %
|
u"seconds, so we're not logging it." %
|
||||||
(session['rating_key'], str(real_play_time), plexpy.CONFIG.LOGGING_IGNORE_INTERVAL))
|
(session['rating_key'], str(real_play_time), plexpy.CONFIG.LOGGING_IGNORE_INTERVAL))
|
||||||
elif is_import and import_ignore_interval:
|
elif is_import and import_ignore_interval:
|
||||||
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
|
if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \
|
||||||
(real_play_time < int(import_ignore_interval)):
|
(real_play_time < int(import_ignore_interval)):
|
||||||
logging_enabled = False
|
logging_enabled = False
|
||||||
logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s "
|
logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s "
|
||||||
u"seconds, so we're not logging it." %
|
u"seconds, so we're not logging it." %
|
||||||
(session['rating_key'], str(real_play_time),
|
(session['rating_key'], str(real_play_time),
|
||||||
import_ignore_interval))
|
import_ignore_interval))
|
||||||
|
|
||||||
if not user_details['keep_history'] and not is_import:
|
if not user_details['keep_history'] and not is_import:
|
||||||
logging_enabled = False
|
logging_enabled = False
|
||||||
logger.debug(u"PlexPy Monitor :: History logging for user '%s' is disabled." % session['user'])
|
logger.debug(u"PlexPy ActivityProcessor :: History logging for user '%s' is disabled." % session['user'])
|
||||||
|
|
||||||
if logging_enabled:
|
if logging_enabled:
|
||||||
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history table...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...")
|
||||||
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
|
query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \
|
||||||
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
|
'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \
|
||||||
'platform, machine_id, view_offset) VALUES ' \
|
'platform, machine_id, view_offset) VALUES ' \
|
||||||
|
@ -155,14 +155,14 @@ class ActivityProcessor(object):
|
||||||
session['ip_address'], session['paused_counter'], session['player'], session['platform'],
|
session['ip_address'], session['paused_counter'], session['player'], session['platform'],
|
||||||
session['machine_id'], session['view_offset']]
|
session['machine_id'], session['view_offset']]
|
||||||
|
|
||||||
# logger.debug(u"PlexPy Monitor :: Writing session_history transaction...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history transaction...")
|
||||||
self.db.action(query=query, args=args)
|
self.db.action(query=query, args=args)
|
||||||
|
|
||||||
# logger.debug(u"PlexPy Monitor :: Successfully written history item, last id for session_history is %s"
|
# logger.debug(u"PlexPy ActivityProcessor :: Successfully written history item, last id for session_history is %s"
|
||||||
# % last_id)
|
# % last_id)
|
||||||
|
|
||||||
# Write the session_history_media_info table
|
# Write the session_history_media_info table
|
||||||
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_media_info table...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_media_info table...")
|
||||||
query = 'INSERT INTO session_history_media_info (id, rating_key, video_decision, audio_decision, ' \
|
query = 'INSERT INTO session_history_media_info (id, rating_key, video_decision, audio_decision, ' \
|
||||||
'duration, width, height, container, video_codec, audio_codec, bitrate, video_resolution, ' \
|
'duration, width, height, container, video_codec, audio_codec, bitrate, video_resolution, ' \
|
||||||
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, transcode_container, ' \
|
'video_framerate, aspect_ratio, audio_channels, transcode_protocol, transcode_container, ' \
|
||||||
|
@ -178,11 +178,11 @@ class ActivityProcessor(object):
|
||||||
session['transcode_video_codec'], session['transcode_audio_codec'],
|
session['transcode_video_codec'], session['transcode_audio_codec'],
|
||||||
session['transcode_audio_channels'], session['transcode_width'], session['transcode_height']]
|
session['transcode_audio_channels'], session['transcode_width'], session['transcode_height']]
|
||||||
|
|
||||||
# logger.debug(u"PlexPy Monitor :: Writing session_history_media_info transaction...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...")
|
||||||
self.db.action(query=query, args=args)
|
self.db.action(query=query, args=args)
|
||||||
|
|
||||||
if not is_import:
|
if not is_import:
|
||||||
logger.debug(u"PlexPy Monitor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
|
logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key'])
|
||||||
pms_connect = pmsconnect.PmsConnect()
|
pms_connect = pmsconnect.PmsConnect()
|
||||||
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
|
result = pms_connect.get_metadata_details(rating_key=str(session['rating_key']))
|
||||||
metadata = result['metadata']
|
metadata = result['metadata']
|
||||||
|
@ -203,7 +203,7 @@ class ActivityProcessor(object):
|
||||||
else:
|
else:
|
||||||
full_title = metadata['title']
|
full_title = metadata['title']
|
||||||
|
|
||||||
# logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_metadata table...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_metadata table...")
|
||||||
query = 'INSERT INTO session_history_metadata (id, rating_key, parent_rating_key, ' \
|
query = 'INSERT INTO session_history_metadata (id, rating_key, parent_rating_key, ' \
|
||||||
'grandparent_rating_key, title, parent_title, grandparent_title, full_title, media_index, ' \
|
'grandparent_rating_key, title, parent_title, grandparent_title, full_title, media_index, ' \
|
||||||
'parent_media_index, thumb, parent_thumb, grandparent_thumb, art, media_type, year, ' \
|
'parent_media_index, thumb, parent_thumb, grandparent_thumb, art, media_type, year, ' \
|
||||||
|
@ -220,12 +220,12 @@ class ActivityProcessor(object):
|
||||||
metadata['last_viewed_at'], metadata['content_rating'], metadata['summary'], metadata['tagline'],
|
metadata['last_viewed_at'], metadata['content_rating'], metadata['summary'], metadata['tagline'],
|
||||||
metadata['rating'], metadata['duration'], metadata['guid'], directors, writers, actors, genres, metadata['studio']]
|
metadata['rating'], metadata['duration'], metadata['guid'], directors, writers, actors, genres, metadata['studio']]
|
||||||
|
|
||||||
# logger.debug(u"PlexPy Monitor :: Writing session_history_metadata transaction...")
|
# logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...")
|
||||||
self.db.action(query=query, args=args)
|
self.db.action(query=query, args=args)
|
||||||
|
|
||||||
def find_session_ip(self, rating_key=None, machine_id=None):
|
def find_session_ip(self, rating_key=None, machine_id=None):
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Requesting log lines...")
|
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
|
||||||
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
|
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
|
||||||
|
|
||||||
rating_key_line = 'ratingKey=' + rating_key
|
rating_key_line = 'ratingKey=' + rating_key
|
||||||
|
@ -241,18 +241,18 @@ class ActivityProcessor(object):
|
||||||
if ipv4:
|
if ipv4:
|
||||||
# The logged IP will always be the first match and we don't want localhost entries
|
# The logged IP will always be the first match and we don't want localhost entries
|
||||||
if ipv4[0] != '127.0.0.1':
|
if ipv4[0] != '127.0.0.1':
|
||||||
logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s "
|
logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s "
|
||||||
u"and machineIdentifier %s."
|
u"and machineIdentifier %s."
|
||||||
% (ipv4[0], rating_key, machine_id))
|
% (ipv4[0], rating_key, machine_id))
|
||||||
return ipv4[0]
|
return ipv4[0]
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Unable to find IP address on first pass. "
|
logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on first pass. "
|
||||||
u"Attempting fallback check in 5 seconds...")
|
u"Attempting fallback check in 5 seconds...")
|
||||||
|
|
||||||
# Wait for the log to catch up and read in new lines
|
# Wait for the log to catch up and read in new lines
|
||||||
time.sleep(5)
|
time.sleep(5)
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Requesting log lines...")
|
logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...")
|
||||||
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
|
log_lines = log_reader.get_log_tail(window=5000, parsed=False)
|
||||||
|
|
||||||
for line in reversed(log_lines):
|
for line in reversed(log_lines):
|
||||||
|
@ -264,11 +264,11 @@ class ActivityProcessor(object):
|
||||||
if ipv4:
|
if ipv4:
|
||||||
# The logged IP will always be the first match and we don't want localhost entries
|
# The logged IP will always be the first match and we don't want localhost entries
|
||||||
if ipv4[0] != '127.0.0.1':
|
if ipv4[0] != '127.0.0.1':
|
||||||
logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s." %
|
logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s." %
|
||||||
(ipv4[0], rating_key))
|
(ipv4[0], rating_key))
|
||||||
return ipv4[0]
|
return ipv4[0]
|
||||||
|
|
||||||
logger.debug(u"PlexPy Monitor :: Unable to find IP address on fallback search. Not logging IP address.")
|
logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on fallback search. Not logging IP address.")
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
@ -327,3 +327,37 @@ class ActivityProcessor(object):
|
||||||
|
|
||||||
keys = {'session_key': session_key}
|
keys = {'session_key': session_key}
|
||||||
self.db.upsert('sessions', values, keys)
|
self.db.upsert('sessions', values, keys)
|
||||||
|
|
||||||
|
def increment_session_buffer_count(self, session_key=None):
|
||||||
|
if str(session_key).isdigit():
|
||||||
|
self.db.action('UPDATE sessions SET buffer_count = buffer_count + 1 '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
|
||||||
|
def get_session_buffer_count(self, session_key=None):
|
||||||
|
if str(session_key).isdigit():
|
||||||
|
buffer_count = self.db.select_single('SELECT buffer_count '
|
||||||
|
'FROM sessions '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
if buffer_count:
|
||||||
|
return buffer_count
|
||||||
|
|
||||||
|
return 0
|
||||||
|
|
||||||
|
def set_session_buffer_trigger_time(self, session_key=None):
|
||||||
|
if str(session_key).isdigit():
|
||||||
|
self.db.action('UPDATE sessions SET buffer_last_triggered = strftime("%s","now") '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
|
||||||
|
def get_session_buffer_trigger_time(self, session_key=None):
|
||||||
|
if str(session_key).isdigit():
|
||||||
|
last_time = self.db.select_single('SELECT buffer_last_triggered '
|
||||||
|
'FROM sessions '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
if last_time:
|
||||||
|
return last_time
|
||||||
|
|
||||||
|
return None
|
Loading…
Add table
Add a link
Reference in a new issue