diff --git a/data/interfaces/default/settings.html b/data/interfaces/default/settings.html index cd80ec4d..dc1fee3d 100644 --- a/data/interfaces/default/settings.html +++ b/data/interfaces/default/settings.html @@ -395,7 +395,7 @@ available_notification_agents = notifiers.available_notification_agents() -

Instead of polling the server at regular intervals let the server tell us when something happens.

+

Instead of polling the server at regular intervals let the server tell us when something happens. This is currently experimental.

diff --git a/plexpy/activity_handler.py b/plexpy/activity_handler.py index f056d93f..e9b78854 100644 --- a/plexpy/activity_handler.py +++ b/plexpy/activity_handler.py @@ -14,6 +14,8 @@ # along with PlexPy. If not, see . import time +import plexpy + from plexpy import logger, pmsconnect, activity_processor, threading, notification_handler @@ -88,9 +90,6 @@ class ActivityHandler(object): # Remove the session from our temp session table ap.delete_session(session_key=self.get_session_key()) - def on_buffer(self): - pass - def on_pause(self): if self.is_valid_session(): logger.debug(u"PlexPy ActivityHandler :: Session %s has been paused." % str(self.get_session_key())) @@ -131,9 +130,40 @@ class ActivityHandler(object): threading.Thread(target=notification_handler.notify, kwargs=dict(stream_data=db_session, notify_action='resume')).start() + def on_buffer(self): + if self.is_valid_session(): + logger.debug(u"PlexPy ActivityHandler :: Session %s is buffering." % self.get_session_key()) + ap = activity_processor.ActivityProcessor() + db_stream = ap.get_session_by_key(session_key=self.get_session_key()) + + # Increment our buffer count + ap.increment_session_buffer_count(session_key=self.get_session_key()) + + # Get our current buffer count + current_buffer_count = ap.get_session_buffer_count(self.get_session_key()) + logger.debug(u"PlexPy ActivityHandler :: Session %s buffer count is %s." % + (self.get_session_key(), current_buffer_count)) + + # Get our last triggered time + buffer_last_triggered = ap.get_session_buffer_trigger_time(self.get_session_key()) + + time_since_last_trigger = 0 + if buffer_last_triggered: + logger.debug(u"PlexPy ActivityHandler :: Session %s buffer last triggered at %s." % + (self.get_session_key(), buffer_last_triggered)) + time_since_last_trigger = int(time.time()) - int(buffer_last_triggered) + + if current_buffer_count >= plexpy.CONFIG.BUFFER_THRESHOLD and time_since_last_trigger == 0 or \ + time_since_last_trigger >= plexpy.CONFIG.BUFFER_WAIT: + ap.set_session_buffer_trigger_time(session_key=self.get_session_key()) + threading.Thread(target=notification_handler.notify, + kwargs=dict(stream_data=db_stream, notify_action='buffer')).start() + # This function receives events from our websocket connection def process(self): if self.is_valid_session(): + from plexpy import helpers + ap = activity_processor.ActivityProcessor() db_session = ap.get_session_by_key(session_key=self.get_session_key()) @@ -142,19 +172,24 @@ class ActivityHandler(object): this_state = self.timeline['state'] last_state = db_session['state'] + # Start our state checks if this_state != last_state: - # logger.debug(u"PlexPy ActivityHandler :: Last state %s :: Current state %s" % - # (last_state, this_state)) if this_state == 'paused': self.on_pause() elif last_state == 'paused' and this_state == 'playing': self.on_resume() elif this_state == 'stopped': self.on_stop() - # else: - # logger.debug(u"PlexPy ActivityHandler :: Session %s state has not changed." % - # self.get_session_key()) + elif this_state == 'buffering': + self.on_buffer() + + # Monitor if the stream has reached the watch percentage for notifications + # The only purpose of this is for notifications + progress_percent = helpers.get_percent(self.timeline['viewOffset'], db_session['duration']) + if progress_percent >= plexpy.CONFIG.NOTIFY_WATCHED_PERCENT and this_state != 'buffering': + threading.Thread(target=notification_handler.notify, + kwargs=dict(stream_data=db_session, notify_action='watched')).start() + else: # We don't have this session in our table yet, start a new one. - # logger.debug(u"PlexPy ActivityHandler :: Session %s has started." % self.get_session_key()) self.on_start() \ No newline at end of file diff --git a/plexpy/activity_processor.py b/plexpy/activity_processor.py index 24edc719..8fa2345c 100644 --- a/plexpy/activity_processor.py +++ b/plexpy/activity_processor.py @@ -115,7 +115,7 @@ class ActivityProcessor(object): session['media_type'] == 'track': logging_enabled = True else: - logger.debug(u"PlexPy Monitor :: ratingKey %s not logged. Does not meet logging criteria. " + logger.debug(u"PlexPy ActivityProcessor :: ratingKey %s not logged. Does not meet logging criteria. " u"Media type is '%s'" % (session['rating_key'], session['media_type'])) if str(session['paused_counter']).isdigit(): @@ -127,24 +127,24 @@ class ActivityProcessor(object): if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \ (real_play_time < int(plexpy.CONFIG.LOGGING_IGNORE_INTERVAL)): logging_enabled = False - logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s " + logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s " u"seconds, so we're not logging it." % (session['rating_key'], str(real_play_time), plexpy.CONFIG.LOGGING_IGNORE_INTERVAL)) elif is_import and import_ignore_interval: if (session['media_type'] == 'movie' or session['media_type'] == 'episode') and \ (real_play_time < int(import_ignore_interval)): logging_enabled = False - logger.debug(u"PlexPy Monitor :: Play duration for ratingKey %s is %s secs which is less than %s " + logger.debug(u"PlexPy ActivityProcessor :: Play duration for ratingKey %s is %s secs which is less than %s " u"seconds, so we're not logging it." % (session['rating_key'], str(real_play_time), import_ignore_interval)) if not user_details['keep_history'] and not is_import: logging_enabled = False - logger.debug(u"PlexPy Monitor :: History logging for user '%s' is disabled." % session['user']) + logger.debug(u"PlexPy ActivityProcessor :: History logging for user '%s' is disabled." % session['user']) if logging_enabled: - # logger.debug(u"PlexPy Monitor :: Attempting to write to session_history table...") + # logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history table...") query = 'INSERT INTO session_history (started, stopped, rating_key, parent_rating_key, ' \ 'grandparent_rating_key, media_type, user_id, user, ip_address, paused_counter, player, ' \ 'platform, machine_id, view_offset) VALUES ' \ @@ -155,14 +155,14 @@ class ActivityProcessor(object): session['ip_address'], session['paused_counter'], session['player'], session['platform'], session['machine_id'], session['view_offset']] - # logger.debug(u"PlexPy Monitor :: Writing session_history transaction...") + # logger.debug(u"PlexPy ActivityProcessor :: Writing session_history transaction...") self.db.action(query=query, args=args) - # logger.debug(u"PlexPy Monitor :: Successfully written history item, last id for session_history is %s" + # logger.debug(u"PlexPy ActivityProcessor :: Successfully written history item, last id for session_history is %s" # % last_id) # Write the session_history_media_info table - # logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_media_info table...") + # logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_media_info table...") query = 'INSERT INTO session_history_media_info (id, rating_key, video_decision, audio_decision, ' \ 'duration, width, height, container, video_codec, audio_codec, bitrate, video_resolution, ' \ 'video_framerate, aspect_ratio, audio_channels, transcode_protocol, transcode_container, ' \ @@ -178,11 +178,11 @@ class ActivityProcessor(object): session['transcode_video_codec'], session['transcode_audio_codec'], session['transcode_audio_channels'], session['transcode_width'], session['transcode_height']] - # logger.debug(u"PlexPy Monitor :: Writing session_history_media_info transaction...") + # logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_media_info transaction...") self.db.action(query=query, args=args) if not is_import: - logger.debug(u"PlexPy Monitor :: Fetching metadata for item ratingKey %s" % session['rating_key']) + logger.debug(u"PlexPy ActivityProcessor :: Fetching metadata for item ratingKey %s" % session['rating_key']) pms_connect = pmsconnect.PmsConnect() result = pms_connect.get_metadata_details(rating_key=str(session['rating_key'])) metadata = result['metadata'] @@ -203,7 +203,7 @@ class ActivityProcessor(object): else: full_title = metadata['title'] - # logger.debug(u"PlexPy Monitor :: Attempting to write to session_history_metadata table...") + # logger.debug(u"PlexPy ActivityProcessor :: Attempting to write to session_history_metadata table...") query = 'INSERT INTO session_history_metadata (id, rating_key, parent_rating_key, ' \ 'grandparent_rating_key, title, parent_title, grandparent_title, full_title, media_index, ' \ 'parent_media_index, thumb, parent_thumb, grandparent_thumb, art, media_type, year, ' \ @@ -220,12 +220,12 @@ class ActivityProcessor(object): metadata['last_viewed_at'], metadata['content_rating'], metadata['summary'], metadata['tagline'], metadata['rating'], metadata['duration'], metadata['guid'], directors, writers, actors, genres, metadata['studio']] - # logger.debug(u"PlexPy Monitor :: Writing session_history_metadata transaction...") + # logger.debug(u"PlexPy ActivityProcessor :: Writing session_history_metadata transaction...") self.db.action(query=query, args=args) def find_session_ip(self, rating_key=None, machine_id=None): - logger.debug(u"PlexPy Monitor :: Requesting log lines...") + logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...") log_lines = log_reader.get_log_tail(window=5000, parsed=False) rating_key_line = 'ratingKey=' + rating_key @@ -241,18 +241,18 @@ class ActivityProcessor(object): if ipv4: # The logged IP will always be the first match and we don't want localhost entries if ipv4[0] != '127.0.0.1': - logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s " + logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s " u"and machineIdentifier %s." % (ipv4[0], rating_key, machine_id)) return ipv4[0] - logger.debug(u"PlexPy Monitor :: Unable to find IP address on first pass. " + logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on first pass. " u"Attempting fallback check in 5 seconds...") # Wait for the log to catch up and read in new lines time.sleep(5) - logger.debug(u"PlexPy Monitor :: Requesting log lines...") + logger.debug(u"PlexPy ActivityProcessor :: Requesting log lines...") log_lines = log_reader.get_log_tail(window=5000, parsed=False) for line in reversed(log_lines): @@ -264,11 +264,11 @@ class ActivityProcessor(object): if ipv4: # The logged IP will always be the first match and we don't want localhost entries if ipv4[0] != '127.0.0.1': - logger.debug(u"PlexPy Monitor :: Matched IP address (%s) for stream ratingKey %s." % + logger.debug(u"PlexPy ActivityProcessor :: Matched IP address (%s) for stream ratingKey %s." % (ipv4[0], rating_key)) return ipv4[0] - logger.debug(u"PlexPy Monitor :: Unable to find IP address on fallback search. Not logging IP address.") + logger.debug(u"PlexPy ActivityProcessor :: Unable to find IP address on fallback search. Not logging IP address.") return None @@ -327,3 +327,37 @@ class ActivityProcessor(object): keys = {'session_key': session_key} self.db.upsert('sessions', values, keys) + + def increment_session_buffer_count(self, session_key=None): + if str(session_key).isdigit(): + self.db.action('UPDATE sessions SET buffer_count = buffer_count + 1 ' + 'WHERE session_key = ?', + [session_key]) + + def get_session_buffer_count(self, session_key=None): + if str(session_key).isdigit(): + buffer_count = self.db.select_single('SELECT buffer_count ' + 'FROM sessions ' + 'WHERE session_key = ?', + [session_key]) + if buffer_count: + return buffer_count + + return 0 + + def set_session_buffer_trigger_time(self, session_key=None): + if str(session_key).isdigit(): + self.db.action('UPDATE sessions SET buffer_last_triggered = strftime("%s","now") ' + 'WHERE session_key = ?', + [session_key]) + + def get_session_buffer_trigger_time(self, session_key=None): + if str(session_key).isdigit(): + last_time = self.db.select_single('SELECT buffer_last_triggered ' + 'FROM sessions ' + 'WHERE session_key = ?', + [session_key]) + if last_time: + return last_time + + return None \ No newline at end of file