diff --git a/plexpy/__init__.py b/plexpy/__init__.py index d27cd396..003259bc 100644 --- a/plexpy/__init__.py +++ b/plexpy/__init__.py @@ -656,7 +656,8 @@ def dbcheck(): 'synced_version INTEGER, synced_version_profile TEXT, ' 'live INTEGER, live_uuid TEXT, channel_call_sign TEXT, channel_identifier TEXT, channel_thumb TEXT, ' 'secure INTEGER, relayed INTEGER, ' - 'buffer_count INTEGER DEFAULT 0, buffer_last_triggered INTEGER, last_paused INTEGER, watched INTEGER DEFAULT 0, ' + 'buffer_count INTEGER DEFAULT 0, buffer_last_triggered INTEGER, last_paused INTEGER, ' + 'watched INTEGER DEFAULT 0, intro INTEGER DEFAULT 0, credits INTEGER DEFAULT 0, ' 'initial_stream INTEGER DEFAULT 1, write_attempts INTEGER DEFAULT 0, raw_stream_info TEXT, ' 'rating_key_websocket TEXT)' ) @@ -1401,6 +1402,18 @@ def dbcheck(): 'ALTER TABLE sessions ADD COLUMN stream_subtitle_forced INTEGER' ) + # Upgrade sessions table from earlier versions + try: + c_db.execute('SELECT intro FROM sessions') + except sqlite3.OperationalError: + logger.debug(u"Altering database. Updating database table sessions.") + c_db.execute( + 'ALTER TABLE sessions ADD COLUMN intro INTEGER DEFAULT 0' + ) + c_db.execute( + 'ALTER TABLE sessions ADD COLUMN credits INTEGER DEFAULT 0' + ) + # Upgrade session_history table from earlier versions try: c_db.execute('SELECT reference_id FROM session_history') diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index 07d0f8e3..a89ccb98 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -51,7 +51,11 @@ RECENTLY_ADDED_QUEUE = {} class ActivityHandler(object): def __init__(self, timeline): + self.ap = activity_processor.ActivityProcessor() self.timeline = timeline + self.db_session = None + self.session = None + self.metadata = None def is_valid_session(self): if 'sessionKey' in self.timeline: @@ -72,15 +76,18 @@ class ActivityHandler(object): return None + def get_db_session(self): + # Retrieve the session data from our temp table + self.db_session = self.ap.get_session_by_key(session_key=self.get_session_key()) + def get_metadata(self, skip_cache=False): - cache_key = None if skip_cache else self.get_session_key() - pms_connect = pmsconnect.PmsConnect() - metadata = pms_connect.get_metadata_details(rating_key=self.get_rating_key(), cache_key=cache_key) + if self.metadata is None: + cache_key = None if skip_cache else self.get_session_key() + pms_connect = pmsconnect.PmsConnect() + metadata = pms_connect.get_metadata_details(rating_key=self.get_rating_key(), cache_key=cache_key) - if metadata: - return metadata - - return None + if metadata: + self.metadata = metadata def get_live_session(self, skip_cache=False): pms_connect = pmsconnect.PmsConnect() @@ -94,196 +101,179 @@ class ActivityHandler(object): if not session['rating_key']: session['rating_key'] = self.get_rating_key() session['rating_key_websocket'] = self.get_rating_key() + self.session = session return session - return None + def update_db_session(self, notify=False): + if self.session is None: + self.get_live_session() - def update_db_session(self, session=None, notify=False): - if session is None: - session = self.get_live_session() - - if session: + if self.session: # Update our session temp table values - ap = activity_processor.ActivityProcessor() - ap.write_session(session=session, notify=notify) + self.ap.write_session(session=self.session, notify=notify) self.set_session_state() + self.get_db_session() def set_session_state(self): - ap = activity_processor.ActivityProcessor() - ap.set_session_state(session_key=self.get_session_key(), + self.ap.set_session_state(session_key=self.get_session_key(), state=self.timeline['state'], view_offset=self.timeline['viewOffset'], stopped=helpers.timestamp()) + + def put_notification(self, notify_action, **kwargs): + notification = {'stream_data': self.db_session.copy(), 'notify_action': notify_action} + notification.update(kwargs) + plexpy.NOTIFY_QUEUE.put(notification) def on_start(self): - if self.is_valid_session(): - session = self.get_live_session(skip_cache=True) + self.get_live_session(skip_cache=True) - if not session: + if not self.session: + return + + # Some DLNA clients create a new session temporarily when browsing the library + # Wait and get session again to make sure it is an actual session + if self.session['platform'] == 'DLNA': + time.sleep(1) + self.get_live_session() + if not self.session: return - # Some DLNA clients create a new session temporarily when browsing the library - # Wait and get session again to make sure it is an actual session - if session['platform'] == 'DLNA': - time.sleep(1) - session = self.get_live_session() - if not session: - return + logger.debug("Tautulli ActivityHandler :: Session %s started by user %s (%s) with ratingKey %s (%s)%s." + % (str(self.session['session_key']), str(self.session['user_id']), self.session['username'], + str(self.session['rating_key']), self.session['full_title'], '[Live TV]' if self.session['live'] else '')) - logger.debug("Tautulli ActivityHandler :: Session %s started by user %s (%s) with ratingKey %s (%s)%s." - % (str(session['session_key']), str(session['user_id']), session['username'], - str(session['rating_key']), session['full_title'], '[Live TV]' if session['live'] else '')) + # Write the new session to our temp session table + self.update_db_session(notify=True) - # Send notification after updating db - #plexpy.NOTIFY_QUEUE.put({'stream_data': session.copy(), 'notify_action': 'on_play'}) - - # Write the new session to our temp session table - self.update_db_session(session=session, notify=True) - - # Schedule a callback to force stop a stale stream 5 minutes later - schedule_callback('session_key-{}'.format(self.get_session_key()), - func=force_stop_stream, - args=[self.get_session_key(), session['full_title'], session['username']], - minutes=5) + # Schedule a callback to force stop a stale stream 5 minutes later + schedule_callback('session_key-{}'.format(self.get_session_key()), + func=force_stop_stream, + args=[self.get_session_key(), self.session['full_title'], self.session['username']], + minutes=5) + + self.check_markers() def on_stop(self, force_stop=False): - if self.is_valid_session(): - logger.debug("Tautulli ActivityHandler :: Session %s %sstopped." - % (str(self.get_session_key()), 'force ' if force_stop else '')) + logger.debug("Tautulli ActivityHandler :: Session %s %sstopped." + % (str(self.get_session_key()), 'force ' if force_stop else '')) - # Set the session last_paused timestamp - ap = activity_processor.ActivityProcessor() - ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) + # Set the session last_paused timestamp + self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) - # Update the session state and viewOffset - # Set force_stop to true to disable the state set - if not force_stop: - self.set_session_state() + # Update the session state and viewOffset + # Set force_stop to true to disable the state set + if not force_stop: + self.set_session_state() - # Retrieve the session data from our temp table - db_session = ap.get_session_by_key(session_key=self.get_session_key()) + # Write it to the history table + row_id = self.ap.write_session_history(session=self.db_session) - # Write it to the history table - monitor_proc = activity_processor.ActivityProcessor() - row_id = monitor_proc.write_session_history(session=db_session) + if row_id: + self.put_notification('on_stop') - if row_id: - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_stop'}) + schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True) - schedule_callback('session_key-{}'.format(self.get_session_key()), remove_job=True) - - # Remove the session from our temp session table - logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue" - % (str(self.get_session_key()), str(self.get_rating_key()))) - ap.delete_session(row_id=row_id) - delete_metadata_cache(self.get_session_key()) - else: - schedule_callback('session_key-{}'.format(self.get_session_key()), - func=force_stop_stream, - args=[self.get_session_key(), db_session['full_title'], db_session['user']], - seconds=30) + # Remove the session from our temp session table + logger.debug("Tautulli ActivityHandler :: Removing sessionKey %s ratingKey %s from session queue" + % (str(self.get_session_key()), str(self.get_rating_key()))) + self.ap.delete_session(row_id=row_id) + delete_metadata_cache(self.get_session_key()) + else: + schedule_callback('session_key-{}'.format(self.get_session_key()), + func=force_stop_stream, + args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']], + seconds=30) def on_pause(self, still_paused=False): - if self.is_valid_session(): - if not still_paused: - logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.get_session_key())) + if not still_paused: + logger.debug("Tautulli ActivityHandler :: Session %s paused." % str(self.get_session_key())) - # Set the session last_paused timestamp - ap = activity_processor.ActivityProcessor() - ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=helpers.timestamp()) + # Set the session last_paused timestamp + self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=helpers.timestamp()) - # Update the session state and viewOffset - self.update_db_session() + self.update_db_session() - # Retrieve the session data from our temp table - db_session = ap.get_session_by_key(session_key=self.get_session_key()) - - if not still_paused: - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_pause'}) + if not still_paused: + self.put_notification('on_pause') def on_resume(self): - if self.is_valid_session(): - logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.get_session_key())) + logger.debug("Tautulli ActivityHandler :: Session %s resumed." % str(self.get_session_key())) - # Set the session last_paused timestamp - ap = activity_processor.ActivityProcessor() - ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) + # Set the session last_paused timestamp + self.ap.set_session_last_paused(session_key=self.get_session_key(), timestamp=None) - # Update the session state and viewOffset - self.update_db_session() + self.update_db_session() - # Retrieve the session data from our temp table - db_session = ap.get_session_by_key(session_key=self.get_session_key()) - - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_resume'}) - - def on_change(self): - if self.is_valid_session(): - logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.get_session_key())) - - # Update the session state and viewOffset - self.update_db_session() - - # Retrieve the session data from our temp table - ap = activity_processor.ActivityProcessor() - db_session = ap.get_session_by_key(session_key=self.get_session_key()) - - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_change'}) + self.put_notification('on_resume') def on_buffer(self): - if self.is_valid_session(): - logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.get_session_key()) - ap = activity_processor.ActivityProcessor() - db_stream = ap.get_session_by_key(session_key=self.get_session_key()) + logger.debug("Tautulli ActivityHandler :: Session %s is buffering." % self.get_session_key()) - # Increment our buffer count - ap.increment_session_buffer_count(session_key=self.get_session_key()) + # Increment our buffer count + self.ap.increment_session_buffer_count(session_key=self.get_session_key()) - # Get our current buffer count - current_buffer_count = ap.get_session_buffer_count(self.get_session_key()) - logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." % - (self.get_session_key(), current_buffer_count)) + # Get our current buffer count + current_buffer_count = self.ap.get_session_buffer_count(self.get_session_key()) + logger.debug("Tautulli ActivityHandler :: Session %s buffer count is %s." % + (self.get_session_key(), current_buffer_count)) - # Get our last triggered time - buffer_last_triggered = ap.get_session_buffer_trigger_time(self.get_session_key()) + # Get our last triggered time + buffer_last_triggered = self.ap.get_session_buffer_trigger_time(self.get_session_key()) - # Update the session state and viewOffset - self.update_db_session() + self.update_db_session() - time_since_last_trigger = 0 - if buffer_last_triggered: - logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." % - (self.get_session_key(), buffer_last_triggered)) - time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered) + time_since_last_trigger = 0 + if buffer_last_triggered: + logger.debug("Tautulli ActivityHandler :: Session %s buffer last triggered at %s." % + (self.get_session_key(), buffer_last_triggered)) + time_since_last_trigger = helpers.timestamp() - int(buffer_last_triggered) - if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \ - time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT: - ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) + if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \ + time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT: + self.ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) - # Retrieve the session data from our temp table - db_session = ap.get_session_by_key(session_key=self.get_session_key()) - - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_buffer'}) + self.put_notification('on_buffer') def on_error(self): - if self.is_valid_session(): - logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.get_session_key())) + logger.debug("Tautulli ActivityHandler :: Session %s encountered an error." % str(self.get_session_key())) - # Update the session state and viewOffset - self.update_db_session() + self.update_db_session() - # Retrieve the session data from our temp table - ap = activity_processor.ActivityProcessor() - db_session = ap.get_session_by_key(session_key=self.get_session_key()) + self.put_notification('on_error') - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), 'notify_action': 'on_error'}) + def on_change(self): + logger.debug("Tautulli ActivityHandler :: Session %s has changed transcode decision." % str(self.get_session_key())) + + self.update_db_session() + + self.put_notification('on_change') + + def on_intro(self): + if self.get_live_session(): + logger.debug("Tautulli ActivityHandler :: Session %s intro marker reached." % str(self.get_session_key())) + + self.put_notification('on_intro') + + def on_credits(self): + if self.get_live_session(): + logger.debug("Tautulli ActivityHandler :: Session %s credits marker reached." % str(self.get_session_key())) + self.put_notification('on_credits') + + def on_watched(self): + logger.debug("Tautulli ActivityHandler :: Session %s watched." % str(self.get_session_key())) + + watched_notifiers = notification_handler.get_notify_state_enabled( + session=self.db_session, notify_action='on_watched', notified=False) + + for d in watched_notifiers: + self.put_notification('on_watched', notifier_id=d['notifier_id']) # This function receives events from our websocket connection def process(self): if self.is_valid_session(): - ap = activity_processor.ActivityProcessor() - db_session = ap.get_session_by_key(session_key=self.get_session_key()) + self.get_db_session() this_state = self.timeline['state'] this_rating_key = str(self.timeline['ratingKey']) @@ -294,27 +284,27 @@ class ActivityHandler(object): this_live_uuid = this_key.split('/')[-1] if this_key.startswith('/livetv/sessions') else None # If we already have this session in the temp table, check for state changes - if db_session: + if self.db_session: # Re-schedule the callback to reset the 5 minutes timer schedule_callback('session_key-{}'.format(self.get_session_key()), func=force_stop_stream, - args=[self.get_session_key(), db_session['full_title'], db_session['user']], + args=[self.get_session_key(), self.db_session['full_title'], self.db_session['user']], minutes=5) - last_state = db_session['state'] - last_rating_key = str(db_session['rating_key']) - last_live_uuid = db_session['live_uuid'] - last_transcode_key = db_session['transcode_key'].split('/')[-1] - last_paused = db_session['last_paused'] - last_rating_key_websocket = db_session['rating_key_websocket'] - last_guid = db_session['guid'] + last_state = self.db_session['state'] + last_rating_key = str(self.db_session['rating_key']) + last_live_uuid = self.db_session['live_uuid'] + last_transcode_key = self.db_session['transcode_key'].split('/')[-1] + last_paused = self.db_session['last_paused'] + last_rating_key_websocket = self.db_session['rating_key_websocket'] + last_guid = self.db_session['guid'] this_guid = last_guid # Check guid for live TV metadata every 60 seconds - if db_session['live'] and helpers.timestamp() - db_session['stopped'] > 60: - metadata = self.get_metadata(skip_cache=True) - if metadata: - this_guid = metadata['guid'] + if self.db_session['live'] and helpers.timestamp() - self.db_session['stopped'] > 60: + self.get_metadata(skip_cache=True) + if self.metadata: + this_guid = self.metadata['guid'] # Make sure the same item is being played if (this_rating_key == last_rating_key @@ -325,7 +315,7 @@ class ActivityHandler(object): if this_state == 'playing': # Update the session in our temp session table # if the last set temporary stopped time exceeds 60 seconds - if helpers.timestamp() - db_session['stopped'] > 60: + if helpers.timestamp() - self.db_session['stopped'] > 60: self.update_db_session() # Start our state checks @@ -356,33 +346,65 @@ class ActivityHandler(object): self.on_stop(force_stop=True) self.on_start() - # Monitor if the stream has reached the watch percentage for notifications - # The only purpose of this is for notifications - if not db_session['watched'] and this_state != 'buffering': - progress_percent = helpers.get_percent(self.timeline['viewOffset'], db_session['duration']) - watched_percent = {'movie': plexpy.CONFIG.MOVIE_WATCHED_PERCENT, - 'episode': plexpy.CONFIG.TV_WATCHED_PERCENT, - 'track': plexpy.CONFIG.MUSIC_WATCHED_PERCENT, - 'clip': plexpy.CONFIG.TV_WATCHED_PERCENT - } - - if progress_percent >= watched_percent.get(db_session['media_type'], 101): - logger.debug("Tautulli ActivityHandler :: Session %s watched." - % str(self.get_session_key())) - ap.set_watched(session_key=self.get_session_key()) - - watched_notifiers = notification_handler.get_notify_state_enabled( - session=db_session, notify_action='on_watched', notified=False) - - for d in watched_notifiers: - plexpy.NOTIFY_QUEUE.put({'stream_data': db_session.copy(), - 'notifier_id': d['notifier_id'], - 'notify_action': 'on_watched'}) + # Check for stream offset notifications + self.check_markers() + self.check_watched() else: # We don't have this session in our table yet, start a new one. if this_state != 'buffering': self.on_start() + + def check_markers(self): + # Monitor if the stream has reached the intro or credit marker offsets + self.get_metadata() + + intro_markers, credits_markers = [], [] + for marker in self.metadata['markers']: + if marker['type'] == 'intro': + intro_markers.append(marker) + elif marker['type'] == 'credits': + credits_markers.append(marker) + + self._check_marker('intro', intro_markers) + self._check_marker('credits', credits_markers) + + def _check_marker(self, marker_type, markers): + if self.db_session[marker_type] < len(markers): + marker = markers[self.db_session[marker_type]] + + # Websocket events only fire every 10 seconds + # Check if the marker is within 10 seconds of the current viewOffset + if marker['start_time_offset'] - 10000 <= self.timeline['viewOffset'] <= marker['end_time_offset']: + set_func = getattr(self.ap, 'set_{}'.format(marker_type)) + callback_func = getattr(self, 'on_{}'.format(marker_type)) + + set_func(session_key=self.get_session_key()) + + if self.timeline['viewOffset'] < marker['start_time_offset']: + # Schedule a callback for the exact offset of the marker + schedule_callback( + 'session_key-{}-{}-{}'.format(self.get_session_key(), marker_type, self.db_session[marker_type]), + func=callback_func, + milliseconds=marker['start_time_offset'] - self.timeline['viewOffset'] + ) + else: + callback_func() + + def check_watched(self): + # Monitor if the stream has reached the watch percentage for notifications + if not self.db_session['watched'] and self.timeline['state'] != 'buffering': + progress_percent = helpers.get_percent(self.timeline['viewOffset'], self.db_session['duration']) + watched_percent = { + 'movie': plexpy.CONFIG.MOVIE_WATCHED_PERCENT, + 'episode': plexpy.CONFIG.TV_WATCHED_PERCENT, + 'track': plexpy.CONFIG.MUSIC_WATCHED_PERCENT, + 'clip': plexpy.CONFIG.TV_WATCHED_PERCENT + } + + if progress_percent >= watched_percent.get(self.db_session['media_type'], 101): + self.ap.set_watched(session_key=self.get_session_key()) + self.on_watched() class TimelineHandler(object): diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index 4608e4c8..e110ea64 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -660,8 +660,18 @@ class ActivityProcessor(object): self.db.action('UPDATE sessions SET write_attempts = ? WHERE session_key = ?', [session['write_attempts'] + 1, session_key]) + def set_intro(self, session_key=None): + self.db.action('UPDATE sessions SET intro = intro + 1 ' + 'WHERE session_key = ?', + [session_key]) + + def set_credits(self, session_key=None): + self.db.action('UPDATE sessions SET credits = credits + 1 ' + 'WHERE session_key = ?', + [session_key]) + def set_watched(self, session_key=None): - self.db.action('UPDATE sessions SET watched = ?' + self.db.action('UPDATE sessions SET watched = ? ' 'WHERE session_key = ?', [1, session_key])