mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-11 07:46:07 -07:00
Add callback to force stop stale streams
This commit is contained in:
parent
a00f36f83b
commit
7e7609743a
3 changed files with 76 additions and 2 deletions
|
@ -32,6 +32,7 @@ import cherrypy
|
||||||
from apscheduler.schedulers.background import BackgroundScheduler
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
from apscheduler.triggers.interval import IntervalTrigger
|
from apscheduler.triggers.interval import IntervalTrigger
|
||||||
|
|
||||||
|
import activity_handler
|
||||||
import activity_pinger
|
import activity_pinger
|
||||||
import config
|
import config
|
||||||
import database
|
import database
|
||||||
|
@ -372,7 +373,7 @@ def initialize_scheduler():
|
||||||
try:
|
try:
|
||||||
SCHED.start()
|
SCHED.start()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.info(e)
|
logger.error(e)
|
||||||
|
|
||||||
|
|
||||||
def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None):
|
def schedule_job(function, name, hours=0, minutes=0, seconds=0, args=None):
|
||||||
|
@ -402,6 +403,9 @@ def start():
|
||||||
global _STARTED
|
global _STARTED
|
||||||
|
|
||||||
if _INITIALIZED:
|
if _INITIALIZED:
|
||||||
|
# Start the scheduler for stale stream callbacks
|
||||||
|
activity_handler.ACTIVITY_SCHED.start()
|
||||||
|
|
||||||
# Start background notification thread
|
# Start background notification thread
|
||||||
notification_handler.start_threads(num_threads=CONFIG.NOTIFICATION_THREADS)
|
notification_handler.start_threads(num_threads=CONFIG.NOTIFICATION_THREADS)
|
||||||
|
|
||||||
|
|
|
@ -13,9 +13,13 @@
|
||||||
# You should have received a copy of the GNU General Public License
|
# You should have received a copy of the GNU General Public License
|
||||||
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
|
# along with PlexPy. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
|
||||||
|
import datetime
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
|
||||||
|
from apscheduler.schedulers.background import BackgroundScheduler
|
||||||
|
from apscheduler.triggers.date import DateTrigger
|
||||||
|
|
||||||
import plexpy
|
import plexpy
|
||||||
import activity_processor
|
import activity_processor
|
||||||
import datafactory
|
import datafactory
|
||||||
|
@ -26,6 +30,8 @@ import notifiers
|
||||||
import pmsconnect
|
import pmsconnect
|
||||||
|
|
||||||
|
|
||||||
|
ACTIVITY_SCHED = BackgroundScheduler()
|
||||||
|
|
||||||
RECENTLY_ADDED_QUEUE = {}
|
RECENTLY_ADDED_QUEUE = {}
|
||||||
|
|
||||||
class ActivityHandler(object):
|
class ActivityHandler(object):
|
||||||
|
@ -206,6 +212,9 @@ class ActivityHandler(object):
|
||||||
|
|
||||||
# If we already have this session in the temp table, check for state changes
|
# If we already have this session in the temp table, check for state changes
|
||||||
if db_session:
|
if db_session:
|
||||||
|
# Re-schedule the callback to reset the 5 minutes timer
|
||||||
|
schedule_callback(self.get_session_key(), args=[self.get_session_key()], minutes=5)
|
||||||
|
|
||||||
last_state = db_session['state']
|
last_state = db_session['state']
|
||||||
last_key = str(db_session['rating_key'])
|
last_key = str(db_session['rating_key'])
|
||||||
|
|
||||||
|
@ -225,6 +234,10 @@ class ActivityHandler(object):
|
||||||
self.on_resume()
|
self.on_resume()
|
||||||
elif this_state == 'stopped':
|
elif this_state == 'stopped':
|
||||||
self.on_stop()
|
self.on_stop()
|
||||||
|
|
||||||
|
# Remove the callback if the stream is stopped
|
||||||
|
schedule_callback(self.get_session_key(), remove_job=True)
|
||||||
|
|
||||||
elif this_state == 'buffering':
|
elif this_state == 'buffering':
|
||||||
self.on_buffer()
|
self.on_buffer()
|
||||||
# If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed
|
# If a client doesn't register stop events (I'm looking at you PHT!) check if the ratingKey has changed
|
||||||
|
@ -250,6 +263,10 @@ class ActivityHandler(object):
|
||||||
if this_state != 'buffering':
|
if this_state != 'buffering':
|
||||||
self.on_start()
|
self.on_start()
|
||||||
|
|
||||||
|
# Schedule a callback to force stop a stale stream 5 minutes later
|
||||||
|
schedule_callback(self.get_session_key(), args=[self.get_session_key()], minutes=5)
|
||||||
|
|
||||||
|
|
||||||
class TimelineHandler(object):
|
class TimelineHandler(object):
|
||||||
|
|
||||||
def __init__(self, timeline):
|
def __init__(self, timeline):
|
||||||
|
@ -439,3 +456,50 @@ def del_keys(key):
|
||||||
del_keys(child_key)
|
del_keys(child_key)
|
||||||
elif key in RECENTLY_ADDED_QUEUE:
|
elif key in RECENTLY_ADDED_QUEUE:
|
||||||
del_keys(RECENTLY_ADDED_QUEUE.pop(key))
|
del_keys(RECENTLY_ADDED_QUEUE.pop(key))
|
||||||
|
|
||||||
|
|
||||||
|
def schedule_callback(id, remove_job=False, args=None, **kwargs):
|
||||||
|
if ACTIVITY_SCHED.get_job(str(id)):
|
||||||
|
if remove_job:
|
||||||
|
ACTIVITY_SCHED.remove_job(str(id))
|
||||||
|
else:
|
||||||
|
ACTIVITY_SCHED.reschedule_job(
|
||||||
|
str(id), args=args, trigger=DateTrigger(
|
||||||
|
run_date=datetime.datetime.now() + datetime.timedelta(**kwargs)))
|
||||||
|
elif not remove_job:
|
||||||
|
ACTIVITY_SCHED.add_job(
|
||||||
|
force_stop_stream, args=args, id=str(id), trigger=DateTrigger(
|
||||||
|
run_date=datetime.datetime.now() + datetime.timedelta(**kwargs)))
|
||||||
|
|
||||||
|
|
||||||
|
def force_stop_stream(session_key):
|
||||||
|
ap = activity_processor.ActivityProcessor()
|
||||||
|
session = ap.get_session_by_key(session_key=session_key)
|
||||||
|
|
||||||
|
success = ap.write_session_history(session=session)
|
||||||
|
|
||||||
|
if success:
|
||||||
|
# If session is written to the databaase successfully, remove the session from the session table
|
||||||
|
logger.info(u"PlexPy ActivityHandler :: Removing stale stream with sessionKey %s ratingKey %s from session queue"
|
||||||
|
% (session['session_key'], session['rating_key']))
|
||||||
|
ap.delete_session(session_key=session_key)
|
||||||
|
|
||||||
|
else:
|
||||||
|
sessions['write_attempts'] += 1
|
||||||
|
|
||||||
|
if sessions['write_attempts'] < plexpy.CONFIG.SESSION_DB_WRITE_ATTEMPTS:
|
||||||
|
logger.warn(u"PlexPy ActivityHandler :: Failed to write stream with sessionKey %s ratingKey %s to the database. " \
|
||||||
|
"Will try again in 30 seconds. Write attempt %s."
|
||||||
|
% (sessions['session_key'], sessions['rating_key'], str(sessions['write_attempts'])))
|
||||||
|
ap.increment_write_attempts(session_key=session_key)
|
||||||
|
|
||||||
|
# Reschedule for 30 seconds later
|
||||||
|
schedule_callback(session_key, args=[session_key], seconds=30)
|
||||||
|
|
||||||
|
else:
|
||||||
|
logger.warn(u"PlexPy Monitor :: Failed to write stream with sessionKey %s ratingKey %s to the database. " \
|
||||||
|
"Removing session from the database. Write attempt %s."
|
||||||
|
% (sessions['session_key'], sessions['rating_key'], str(sessions['write_attempts'])))
|
||||||
|
logger.info(u"PlexPy Monitor :: Removing stale stream with sessionKey %s ratingKey %s from session queue"
|
||||||
|
% (sessions['session_key'], sessions['rating_key']))
|
||||||
|
ap.delete_session(session_key=session_key)
|
|
@ -405,3 +405,9 @@ class ActivityProcessor(object):
|
||||||
def set_temp_stopped(self):
|
def set_temp_stopped(self):
|
||||||
stopped_time = int(time.time())
|
stopped_time = int(time.time())
|
||||||
self.db.action('UPDATE sessions SET stopped = ?', [stopped_time])
|
self.db.action('UPDATE sessions SET stopped = ?', [stopped_time])
|
||||||
|
|
||||||
|
def increment_write_attempts(self, session_key=None):
|
||||||
|
if str(session_key).isdigit():
|
||||||
|
session = self.get_session_by_key(session_key=session_key)
|
||||||
|
self.db.action('UPDATE sessions SET write_attempts = ? WHERE session_key = ?',
|
||||||
|
[session['write_attempts'] + 1, session_key])
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue