mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-08-13 18:16:57 -07:00
Monitor stream intro/credits marker activity
This commit is contained in:
parent
a8539b2927
commit
9a152932ee
3 changed files with 226 additions and 181 deletions
|
@ -656,7 +656,8 @@ def dbcheck():
|
||||||
'synced_version INTEGER, synced_version_profile TEXT, '
|
'synced_version INTEGER, synced_version_profile TEXT, '
|
||||||
'live INTEGER, live_uuid TEXT, channel_call_sign TEXT, channel_identifier TEXT, channel_thumb TEXT, '
|
'live INTEGER, live_uuid TEXT, channel_call_sign TEXT, channel_identifier TEXT, channel_thumb TEXT, '
|
||||||
'secure INTEGER, relayed INTEGER, '
|
'secure INTEGER, relayed INTEGER, '
|
||||||
'buffer_count INTEGER DEFAULT 0, buffer_last_triggered INTEGER, last_paused INTEGER, watched INTEGER DEFAULT 0, '
|
'buffer_count INTEGER DEFAULT 0, buffer_last_triggered INTEGER, last_paused INTEGER, '
|
||||||
|
'watched INTEGER DEFAULT 0, intro INTEGER DEFAULT 0, credits INTEGER DEFAULT 0, '
|
||||||
'initial_stream INTEGER DEFAULT 1, write_attempts INTEGER DEFAULT 0, raw_stream_info TEXT, '
|
'initial_stream INTEGER DEFAULT 1, write_attempts INTEGER DEFAULT 0, raw_stream_info TEXT, '
|
||||||
'rating_key_websocket TEXT)'
|
'rating_key_websocket TEXT)'
|
||||||
)
|
)
|
||||||
|
@ -1401,6 +1402,18 @@ def dbcheck():
|
||||||
'ALTER TABLE sessions ADD COLUMN stream_subtitle_forced INTEGER'
|
'ALTER TABLE sessions ADD COLUMN stream_subtitle_forced INTEGER'
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Upgrade sessions table from earlier versions
|
||||||
|
try:
|
||||||
|
c_db.execute('SELECT intro FROM sessions')
|
||||||
|
except sqlite3.OperationalError:
|
||||||
|
logger.debug(u"Altering database. Updating database table sessions.")
|
||||||
|
c_db.execute(
|
||||||
|
'ALTER TABLE sessions ADD COLUMN intro INTEGER DEFAULT 0'
|
||||||
|
)
|
||||||
|
c_db.execute(
|
||||||
|
'ALTER TABLE sessions ADD COLUMN credits INTEGER DEFAULT 0'
|
||||||
|
)
|
||||||
|
|
||||||
# Upgrade session_history table from earlier versions
|
# Upgrade session_history table from earlier versions
|
||||||
try:
|
try:
|
||||||
c_db.execute('SELECT reference_id FROM session_history')
|
c_db.execute('SELECT reference_id FROM session_history')
|
||||||
|
|
|
@ -51,7 +51,11 @@ RECENTLY_ADDED_QUEUE = {}
|
||||||
class ActivityHandler(object):
|
class ActivityHandler(object):
|
||||||
|
|
||||||
def __init__(self, timeline):
|
def __init__(self, timeline):
|
||||||
|
self.ap = activity_processor.ActivityProcessor()
|
||||||
self.timeline = timeline
|
self.timeline = timeline
|
||||||
|
self.db_session = None
|
||||||
|
self.session = None
|
||||||
|
self.metadata = None
|
||||||
|
|
||||||
def is_valid_session(self):
|
def is_valid_session(self):
|
||||||
if 'sessionKey' in self.timeline:
|
if 'sessionKey' in self.timeline:
|
||||||
|
@ -72,15 +76,18 @@ class ActivityHandler(object):
|
||||||
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
def get_db_session(self):
|
||||||
|
# Retrieve the session data from our temp table
|
||||||
|
self.db_session = self.ap.get_session_by_key(session_key=self.get_session_key())
|
||||||
|
|
||||||
def get_metadata(self, skip_cache=False):
|
def get_metadata(self, skip_cache=False):
|
||||||
cache_key = None if skip_cache else self.get_session_key()
|
if self.metadata is None:
|
||||||
pms_connect = pmsconnect.PmsConnect()
|
cache_key = None if skip_cache else self.get_session_key()
|
||||||
metadata = pms_connect.get_metadata_details(rating_key=self.get_rating_key(), cache_key=cache_key)
|
pms_connect = pmsconnect.PmsConnect()
|
||||||
|
metadata = pms_connect.get_metadata_details(rating_key=self.get_rating_key(), cache_key=cache_key)
|
||||||
|
|
||||||
if metadata:
|
if metadata:
|
||||||
return metadata
|
self.metadata = metadata
|
||||||
|
|
||||||
return None
|
|
||||||
|
|
||||||
def get_live_session(self, skip_cache=False):
|
def get_live_session(self, skip_cache=False):
|
||||||
pms_connect = pmsconnect.PmsConnect()
|
pms_connect = pmsconnect.PmsConnect()
|
||||||
|
@ -94,196 +101,179 @@ class ActivityHandler(object):
|
||||||
if not session['rating_key']:
|
if not session['rating_key']:
|
||||||
session['rating_key'] = self.get_rating_key()
|
session['rating_key'] = self.get_rating_key()
|
||||||
session['rating_key_websocket'] = self.get_rating_key()
|
session['rating_key_websocket'] = self.get_rating_key()
|
||||||
|
self.session = session
|
||||||
return session
|
return session
|
||||||
|
|
||||||
return None
|
def update_db_session(self, notify=False):
|
||||||
|
if self.session is None:
|
||||||
|
self.get_live_session()
|
||||||
|
|
||||||
def update_db_session(self, session=None, notify=False):
|
if self.session:
|
||||||
if session is None:
|
|
||||||
session = self.get_live_session()
|
|
||||||
|
|
||||||
if session:
|
|
||||||
# Update our session temp table values
|
# Update our session temp table values
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.ap.write_session(session=self.session, notify=notify)
|
||||||
ap.write_session(session=session, notify=notify)
|
|
||||||
|
|
||||||
self.set_session_state()
|
self.set_session_state()
|
||||||
|
self.get_db_session()
|
||||||
|
|
||||||
def set_session_state(self):
|
def set_session_state(self):
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.ap.set_session_state(session_key=self.get_session_key(),
|
||||||
ap.set_session_state(session_key=self.get_session_key(),
|
|
||||||
state=self.timeline['state'],
|
state=self.timeline['state'],
|
||||||
view_offset=self.timeline['viewOffset'],
|
view_offset=self.timeline['viewOffset'],
|
||||||
stopped=helpers.timestamp())
|
stopped=helpers.timestamp())
|
||||||
|
|
||||||
|
def put_notification(self, notify_action, **kwargs):
|
||||||
|
notification = {'stream_data': self.db_session.copy(), 'notify_action': notify_action}
|
||||||
|
notification.update(kwargs)
|
||||||
|
plexpy.NOTIFY_QUEUE.put(notification)
|
||||||
|
|
||||||
def on_start(self):
|
def on_start(self):
|
||||||
if self.is_valid_session():
|
self.get_live_session(skip_cache=True)
|
||||||
session = self.get_live_session(skip_cache=True)
|
|
||||||
|
|
||||||
if not session:
|
if not self.session:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Some DLNA clients create a new session temporarily when browsing the library
|
||||||
|
# Wait and get session again to make sure it is an actual session
|
||||||
|
if self.session['platform'] == 'DLNA':
|
||||||
|
time.sleep(1)
|
||||||
|
self.get_live_session()
|
||||||
|
if not self.session:
|
||||||
return
|
return
|
||||||
|
|
||||||
# Some DLNA clients create a new session temporarily when browsing the library
|
logger.debug("Tautulli ActivityHandler :: Session %s started by user %s (%s) with ratingKey %s (%s)%s."
|
||||||
# Wait and get session again to make sure it is an actual session
|
% (str(self.session['session_key']), str(self.session['user_id']), self.session['username'],
|
||||||
if session['platform'] == 'DLNA':
|
str(self.session['rating_key']), self.session['full_title'], '[Live TV]' if self.session['live'] else ''))
|
||||||
time.sleep(1)
|
|
||||||
session = self.get_live_session()
|
|
||||||
if not session:
|
|
||||||
return
|
|
||||||
|
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s started by user %s (%s) with ratingKey %s (%s)%s."
|
# Write the new session to our temp session table
|
||||||
% (str(session['session_key']), str(session['user_id']), session['username'],
|
self.update_db_session(notify=True)
|
||||||
str(session['rating_key']), session['full_title'], '[Live TV]' if session['live'] else ''))
|
|
||||||
|
|
||||||
# Send notification after updating db
|
# Schedule a callback to force stop a stale stream 5 minutes later
|
||||||
#plexpy.NOTIFY_QUEUE.put({'stream_data': session.copy(), 'notify_action': 'on_play'})
|
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
||||||
|
func=force_stop_stream,
|
||||||
# Write the new session to our temp session table
|
args=[self.get_session_key(), self.session['full_title'], self.session['username']],
|
||||||
self.update_db_session(session=session, notify=True)
|
minutes=5)
|
||||||
|
|
||||||
# Schedule a callback to force stop a stale stream 5 minutes later
|
self.check_markers()
|
||||||
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
|
||||||
func=force_stop_stream,
|
|
||||||
args=[self.get_session_key(), session['full_title'], session['username']],
|
|
||||||
minutes=5)
|
|
||||||
|
|
||||||
def on_stop(self, force_stop=False):
|
def on_stop(self, force_stop=False):
|
||||||
if self.is_valid_session():
|
logger.debug("Tautulli ActivityHandler :: Session %s %sstopped."
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s %sstopped."
|
% (str(self.get_session_key()), 'force ' if force_stop else ''))
|
||||||
% (str(self.get_session_key()), 'force ' if force_stop else ''))
|
|
||||||
|
|
||||||
# Set the session last_paused timestamp
|
# Set the session last_paused timestamp
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
|
||||||
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
|
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
# Update the session state and viewOffset
|
||||||
# Set force_stop to true to disable the state set
|
# Set force_stop to true to disable the state set
|
||||||
if not force_stop:
|
if not force_stop:
|
||||||
self.set_session_state()
|
self.set_session_state()
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
# Write it to the history table
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
row_id = self.ap.write_session_history(session=self.db_session)
|
||||||
|
|
||||||
# Write it to the history table
|
if row_id:
|
||||||
monitor_proc = activity_processor.ActivityProcessor()
|
self.put_notification('on_stop')
|
||||||
row_id = monitor_proc.write_session_history(session=db_session)
|
|
||||||
|
|
||||||
if row_id:
|
schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True)
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_stop'})
|
|
||||||
|
|
||||||
schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True)
|
# Remove the session from our temp session table
|
||||||
|
logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue"
|
||||||
# Remove the session from our temp session table
|
% (str(self.get_session_key()), str(self.get_rating_key())))
|
||||||
logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue"
|
self.ap.delete_session(row_id=row_id)
|
||||||
% (str(self.get_session_key()), str(self.get_rating_key())))
|
delete_metadata_cache(self.get_session_key())
|
||||||
ap.delete_session(row_id=row_id)
|
else:
|
||||||
delete_metadata_cache(self.get_session_key())
|
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
||||||
else:
|
func=force_stop_stream,
|
||||||
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']],
|
||||||
func=force_stop_stream,
|
seconds=30)
|
||||||
args=[self.get_session_key(), db_session['full_title'], db_session['user']],
|
|
||||||
seconds=30)
|
|
||||||
|
|
||||||
def on_pause(self, still_paused=False):
|
def on_pause(self, still_paused=False):
|
||||||
if self.is_valid_session():
|
if not still_paused:
|
||||||
if not still_paused:
|
logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.get_session_key()))
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.get_session_key()))
|
|
||||||
|
|
||||||
# Set the session last_paused timestamp
|
# Set the session last_paused timestamp
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=helpers.timestamp())
|
||||||
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=helpers.timestamp())
|
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
self.update_db_session()
|
||||||
self.update_db_session()
|
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
if not still_paused:
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
self.put_notification('on_pause')
|
||||||
|
|
||||||
if not still_paused:
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_pause'})
|
|
||||||
|
|
||||||
def on_resume(self):
|
def on_resume(self):
|
||||||
if self.is_valid_session():
|
logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.get_session_key()))
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.get_session_key()))
|
|
||||||
|
|
||||||
# Set the session last_paused timestamp
|
# Set the session last_paused timestamp
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
|
||||||
ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None)
|
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
self.update_db_session()
|
||||||
self.update_db_session()
|
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
self.put_notification('on_resume')
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_resume'})
|
|
||||||
|
|
||||||
def on_change(self):
|
|
||||||
if self.is_valid_session():
|
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.get_session_key()))
|
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
|
||||||
self.update_db_session()
|
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
|
||||||
ap = activity_processor.ActivityProcessor()
|
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_change'})
|
|
||||||
|
|
||||||
def on_buffer(self):
|
def on_buffer(self):
|
||||||
if self.is_valid_session():
|
logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.get_session_key())
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.get_session_key())
|
|
||||||
ap = activity_processor.ActivityProcessor()
|
|
||||||
db_stream = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
# Increment our buffer count
|
# Increment our buffer count
|
||||||
ap.increment_session_buffer_count(session_key=self.get_session_key())
|
self.ap.increment_session_buffer_count(session_key=self.get_session_key())
|
||||||
|
|
||||||
# Get our current buffer count
|
# Get our current buffer count
|
||||||
current_buffer_count = ap.get_session_buffer_count(self.get_session_key())
|
current_buffer_count = self.ap.get_session_buffer_count(self.get_session_key())
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." %
|
logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." %
|
||||||
(self.get_session_key(), current_buffer_count))
|
(self.get_session_key(), current_buffer_count))
|
||||||
|
|
||||||
# Get our last triggered time
|
# Get our last triggered time
|
||||||
buffer_last_triggered = ap.get_session_buffer_trigger_time(self.get_session_key())
|
buffer_last_triggered = self.ap.get_session_buffer_trigger_time(self.get_session_key())
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
self.update_db_session()
|
||||||
self.update_db_session()
|
|
||||||
|
|
||||||
time_since_last_trigger = 0
|
time_since_last_trigger = 0
|
||||||
if buffer_last_triggered:
|
if buffer_last_triggered:
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." %
|
logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." %
|
||||||
(self.get_session_key(), buffer_last_triggered))
|
(self.get_session_key(), buffer_last_triggered))
|
||||||
time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered)
|
time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered)
|
||||||
|
|
||||||
if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \
|
if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \
|
||||||
time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT:
|
time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT:
|
||||||
ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
|
self.ap.set_session_buffer_trigger_time(session_key=self.get_session_key())
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
self.put_notification('on_buffer')
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_buffer'})
|
|
||||||
|
|
||||||
def on_error(self):
|
def on_error(self):
|
||||||
if self.is_valid_session():
|
logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.get_session_key()))
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.get_session_key()))
|
|
||||||
|
|
||||||
# Update the session state and viewOffset
|
self.update_db_session()
|
||||||
self.update_db_session()
|
|
||||||
|
|
||||||
# Retrieve the session data from our temp table
|
self.put_notification('on_error')
|
||||||
ap = activity_processor.ActivityProcessor()
|
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_error'})
|
def on_change(self):
|
||||||
|
logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.get_session_key()))
|
||||||
|
|
||||||
|
self.update_db_session()
|
||||||
|
|
||||||
|
self.put_notification('on_change')
|
||||||
|
|
||||||
|
def on_intro(self):
|
||||||
|
if self.get_live_session():
|
||||||
|
logger.debug("Tautulli ActivityHandler :: Session %s intro marker reached." % str(self.get_session_key()))
|
||||||
|
|
||||||
|
self.put_notification('on_intro')
|
||||||
|
|
||||||
|
def on_credits(self):
|
||||||
|
if self.get_live_session():
|
||||||
|
logger.debug("Tautulli ActivityHandler :: Session %s credits marker reached." % str(self.get_session_key()))
|
||||||
|
self.put_notification('on_credits')
|
||||||
|
|
||||||
|
def on_watched(self):
|
||||||
|
logger.debug("Tautulli ActivityHandler :: Session %s watched." % str(self.get_session_key()))
|
||||||
|
|
||||||
|
watched_notifiers = notification_handler.get_notify_state_enabled(
|
||||||
|
session=self.db_session, notify_action='on_watched', notified=False)
|
||||||
|
|
||||||
|
for d in watched_notifiers:
|
||||||
|
self.put_notification('on_watched', notifier_id=d['notifier_id'])
|
||||||
|
|
||||||
# This function receives events from our websocket connection
|
# This function receives events from our websocket connection
|
||||||
def process(self):
|
def process(self):
|
||||||
if self.is_valid_session():
|
if self.is_valid_session():
|
||||||
ap = activity_processor.ActivityProcessor()
|
self.get_db_session()
|
||||||
db_session = ap.get_session_by_key(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
this_state = self.timeline['state']
|
this_state = self.timeline['state']
|
||||||
this_rating_key = str(self.timeline['ratingKey'])
|
this_rating_key = str(self.timeline['ratingKey'])
|
||||||
|
@ -294,27 +284,27 @@ class ActivityHandler(object):
|
||||||
this_live_uuid = this_key.split('/')[-1] if this_key.startswith('/livetv/sessions') else None
|
this_live_uuid = this_key.split('/')[-1] if this_key.startswith('/livetv/sessions') else None
|
||||||
|
|
||||||
# If we already have this session in the temp table, check for state changes
|
# If we already have this session in the temp table, check for state changes
|
||||||
if db_session:
|
if self.db_session:
|
||||||
# Re-schedule the callback to reset the 5 minutes timer
|
# Re-schedule the callback to reset the 5 minutes timer
|
||||||
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
schedule_callback('session_key-{}'.format(self.get_session_key()),
|
||||||
func=force_stop_stream,
|
func=force_stop_stream,
|
||||||
args=[self.get_session_key(), db_session['full_title'], db_session['user']],
|
args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']],
|
||||||
minutes=5)
|
minutes=5)
|
||||||
|
|
||||||
last_state = db_session['state']
|
last_state = self.db_session['state']
|
||||||
last_rating_key = str(db_session['rating_key'])
|
last_rating_key = str(self.db_session['rating_key'])
|
||||||
last_live_uuid = db_session['live_uuid']
|
last_live_uuid = self.db_session['live_uuid']
|
||||||
last_transcode_key = db_session['transcode_key'].split('/')[-1]
|
last_transcode_key = self.db_session['transcode_key'].split('/')[-1]
|
||||||
last_paused = db_session['last_paused']
|
last_paused = self.db_session['last_paused']
|
||||||
last_rating_key_websocket = db_session['rating_key_websocket']
|
last_rating_key_websocket = self.db_session['rating_key_websocket']
|
||||||
last_guid = db_session['guid']
|
last_guid = self.db_session['guid']
|
||||||
|
|
||||||
this_guid = last_guid
|
this_guid = last_guid
|
||||||
# Check guid for live TV metadata every 60 seconds
|
# Check guid for live TV metadata every 60 seconds
|
||||||
if db_session['live'] and helpers.timestamp() - db_session['stopped'] > 60:
|
if self.db_session['live'] and helpers.timestamp() - self.db_session['stopped'] > 60:
|
||||||
metadata = self.get_metadata(skip_cache=True)
|
self.get_metadata(skip_cache=True)
|
||||||
if metadata:
|
if self.metadata:
|
||||||
this_guid = metadata['guid']
|
this_guid = self.metadata['guid']
|
||||||
|
|
||||||
# Make sure the same item is being played
|
# Make sure the same item is being played
|
||||||
if (this_rating_key == last_rating_key
|
if (this_rating_key == last_rating_key
|
||||||
|
@ -325,7 +315,7 @@ class ActivityHandler(object):
|
||||||
if this_state == 'playing':
|
if this_state == 'playing':
|
||||||
# Update the session in our temp session table
|
# Update the session in our temp session table
|
||||||
# if the last set temporary stopped time exceeds 60 seconds
|
# if the last set temporary stopped time exceeds 60 seconds
|
||||||
if helpers.timestamp() - db_session['stopped'] > 60:
|
if helpers.timestamp() - self.db_session['stopped'] > 60:
|
||||||
self.update_db_session()
|
self.update_db_session()
|
||||||
|
|
||||||
# Start our state checks
|
# Start our state checks
|
||||||
|
@ -356,33 +346,65 @@ class ActivityHandler(object):
|
||||||
self.on_stop(force_stop=True)
|
self.on_stop(force_stop=True)
|
||||||
self.on_start()
|
self.on_start()
|
||||||
|
|
||||||
# Monitor if the stream has reached the watch percentage for notifications
|
# Check for stream offset notifications
|
||||||
# The only purpose of this is for notifications
|
self.check_markers()
|
||||||
if not db_session['watched'] and this_state != 'buffering':
|
self.check_watched()
|
||||||
progress_percent = helpers.get_percent(self.timeline['viewOffset'], db_session['duration'])
|
|
||||||
watched_percent = {'movie': plexpy.CONFIG.MOVIE_WATCHED_PERCENT,
|
|
||||||
'episode': plexpy.CONFIG.TV_WATCHED_PERCENT,
|
|
||||||
'track': plexpy.CONFIG.MUSIC_WATCHED_PERCENT,
|
|
||||||
'clip': plexpy.CONFIG.TV_WATCHED_PERCENT
|
|
||||||
}
|
|
||||||
|
|
||||||
if progress_percent >= watched_percent.get(db_session['media_type'], 101):
|
|
||||||
logger.debug("Tautulli ActivityHandler :: Session %s watched."
|
|
||||||
% str(self.get_session_key()))
|
|
||||||
ap.set_watched(session_key=self.get_session_key())
|
|
||||||
|
|
||||||
watched_notifiers = notification_handler.get_notify_state_enabled(
|
|
||||||
session=db_session, notify_action='on_watched', notified=False)
|
|
||||||
|
|
||||||
for d in watched_notifiers:
|
|
||||||
plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(),
|
|
||||||
'notifier_id': d['notifier_id'],
|
|
||||||
'notify_action': 'on_watched'})
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# We don't have this session in our table yet, start a new one.
|
# We don't have this session in our table yet, start a new one.
|
||||||
if this_state != 'buffering':
|
if this_state != 'buffering':
|
||||||
self.on_start()
|
self.on_start()
|
||||||
|
|
||||||
|
def check_markers(self):
|
||||||
|
# Monitor if the stream has reached the intro or credit marker offsets
|
||||||
|
self.get_metadata()
|
||||||
|
|
||||||
|
intro_markers, credits_markers = [], []
|
||||||
|
for marker in self.metadata['markers']:
|
||||||
|
if marker['type'] == 'intro':
|
||||||
|
intro_markers.append(marker)
|
||||||
|
elif marker['type'] == 'credits':
|
||||||
|
credits_markers.append(marker)
|
||||||
|
|
||||||
|
self._check_marker('intro', intro_markers)
|
||||||
|
self._check_marker('credits', credits_markers)
|
||||||
|
|
||||||
|
def _check_marker(self, marker_type, markers):
|
||||||
|
if self.db_session[marker_type] < len(markers):
|
||||||
|
marker = markers[self.db_session[marker_type]]
|
||||||
|
|
||||||
|
# Websocket events only fire every 10 seconds
|
||||||
|
# Check if the marker is within 10 seconds of the current viewOffset
|
||||||
|
if marker['start_time_offset'] - 10000 <= self.timeline['viewOffset'] <= marker['end_time_offset']:
|
||||||
|
set_func = getattr(self.ap, 'set_{}'.format(marker_type))
|
||||||
|
callback_func = getattr(self, 'on_{}'.format(marker_type))
|
||||||
|
|
||||||
|
set_func(session_key=self.get_session_key())
|
||||||
|
|
||||||
|
if self.timeline['viewOffset'] < marker['start_time_offset']:
|
||||||
|
# Schedule a callback for the exact offset of the marker
|
||||||
|
schedule_callback(
|
||||||
|
'session_key-{}-{}-{}'.format(self.get_session_key(), marker_type, self.db_session[marker_type]),
|
||||||
|
func=callback_func,
|
||||||
|
milliseconds=marker['start_time_offset'] - self.timeline['viewOffset']
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
callback_func()
|
||||||
|
|
||||||
|
def check_watched(self):
|
||||||
|
# Monitor if the stream has reached the watch percentage for notifications
|
||||||
|
if not self.db_session['watched'] and self.timeline['state'] != 'buffering':
|
||||||
|
progress_percent = helpers.get_percent(self.timeline['viewOffset'], self.db_session['duration'])
|
||||||
|
watched_percent = {
|
||||||
|
'movie': plexpy.CONFIG.MOVIE_WATCHED_PERCENT,
|
||||||
|
'episode': plexpy.CONFIG.TV_WATCHED_PERCENT,
|
||||||
|
'track': plexpy.CONFIG.MUSIC_WATCHED_PERCENT,
|
||||||
|
'clip': plexpy.CONFIG.TV_WATCHED_PERCENT
|
||||||
|
}
|
||||||
|
|
||||||
|
if progress_percent >= watched_percent.get(self.db_session['media_type'], 101):
|
||||||
|
self.ap.set_watched(session_key=self.get_session_key())
|
||||||
|
self.on_watched()
|
||||||
|
|
||||||
|
|
||||||
class TimelineHandler(object):
|
class TimelineHandler(object):
|
||||||
|
|
|
@ -660,8 +660,18 @@ class ActivityProcessor(object):
|
||||||
self.db.action('UPDATE sessions SET write_attempts = ? WHERE session_key = ?',
|
self.db.action('UPDATE sessions SET write_attempts = ? WHERE session_key = ?',
|
||||||
[session['write_attempts'] + 1, session_key])
|
[session['write_attempts'] + 1, session_key])
|
||||||
|
|
||||||
|
def set_intro(self, session_key=None):
|
||||||
|
self.db.action('UPDATE sessions SET intro = intro + 1 '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
|
||||||
|
def set_credits(self, session_key=None):
|
||||||
|
self.db.action('UPDATE sessions SET credits = credits + 1 '
|
||||||
|
'WHERE session_key = ?',
|
||||||
|
[session_key])
|
||||||
|
|
||||||
def set_watched(self, session_key=None):
|
def set_watched(self, session_key=None):
|
||||||
self.db.action('UPDATE sessions SET watched = ?'
|
self.db.action('UPDATE sessions SET watched = ? '
|
||||||
'WHERE session_key = ?',
|
'WHERE session_key = ?',
|
||||||
[1, session_key])
|
[1, session_key])
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue