Do no initialize db connection in ActivityProcessor

This commit is contained in:
JonnyWong16 2024-09-15 14:26:45 -07:00
parent ac32297160
commit 596cf57d61
No known key found for this signature in database
GPG key ID: B1F1F9807184697A

View file

@ -27,11 +27,10 @@ from plexpy import users
class ActivityProcessor(object): class ActivityProcessor(object):
def __init__(self):
self.db = database.MonitorDatabase()
def write_session(self, session=None, notify=True): def write_session(self, session=None, notify=True):
if session: if session:
db = database.MonitorDatabase()
values = {'session_key': session.get('session_key', ''), values = {'session_key': session.get('session_key', ''),
'session_id': session.get('session_id', ''), 'session_id': session.get('session_id', ''),
'transcode_key': session.get('transcode_key', ''), 'transcode_key': session.get('transcode_key', ''),
@ -149,7 +148,7 @@ class ActivityProcessor(object):
keys = {'session_key': session.get('session_key', ''), keys = {'session_key': session.get('session_key', ''),
'rating_key': session.get('rating_key', '')} 'rating_key': session.get('rating_key', '')}
result = self.db.upsert('sessions', values, keys) result = db.upsert('sessions', values, keys)
if result == 'insert': if result == 'insert':
# If it's our first write then time stamp it. # If it's our first write then time stamp it.
@ -159,7 +158,7 @@ class ActivityProcessor(object):
media_type=values['media_type'], media_type=values['media_type'],
started=started) started=started)
timestamp = {'started': started, 'initial_stream': initial_stream} timestamp = {'started': started, 'initial_stream': initial_stream}
self.db.upsert('sessions', timestamp, keys) db.upsert('sessions', timestamp, keys)
# Check if any notification agents have notifications enabled # Check if any notification agents have notifications enabled
if notify: if notify:
@ -260,6 +259,8 @@ class ActivityProcessor(object):
logger.debug("Tautulli ActivityProcessor :: History logging for library '%s' is disabled." % library_details['section_name']) logger.debug("Tautulli ActivityProcessor :: History logging for library '%s' is disabled." % library_details['section_name'])
if logging_enabled: if logging_enabled:
db = database.MonitorDatabase()
media_info = {} media_info = {}
# Fetch metadata first so we can return false if it fails # Fetch metadata first so we can return false if it fails
@ -316,10 +317,10 @@ class ActivityProcessor(object):
# logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history transaction..." # logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history transaction..."
# % session['session_key']) # % session['session_key'])
self.db.upsert(table_name='session_history', key_dict=keys, value_dict=values) db.upsert(table_name='session_history', key_dict=keys, value_dict=values)
# Get the last insert row id # Get the last insert row id
last_id = self.db.last_insert_id() last_id = db.last_insert_id()
self.group_history(last_id, session, metadata) self.group_history(last_id, session, metadata)
# logger.debug("Tautulli ActivityProcessor :: Successfully written history item, last id for session_history is %s" # logger.debug("Tautulli ActivityProcessor :: Successfully written history item, last id for session_history is %s"
@ -410,7 +411,7 @@ class ActivityProcessor(object):
# logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history_media_info transaction..." # logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history_media_info transaction..."
# % session['session_key']) # % session['session_key'])
self.db.upsert(table_name='session_history_media_info', key_dict=keys, value_dict=values) db.upsert(table_name='session_history_media_info', key_dict=keys, value_dict=values)
# Write the session_history_metadata table # Write the session_history_metadata table
directors = ";".join(metadata['directors']) directors = ";".join(metadata['directors'])
@ -475,7 +476,7 @@ class ActivityProcessor(object):
# logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history_metadata transaction..." # logger.debug("Tautulli ActivityProcessor :: Writing sessionKey %s session_history_metadata transaction..."
# % session['session_key']) # % session['session_key'])
self.db.upsert(table_name='session_history_metadata', key_dict=keys, value_dict=values) db.upsert(table_name='session_history_metadata', key_dict=keys, value_dict=values)
# Return the session row id when the session is successfully written to the database # Return the session row id when the session is successfully written to the database
return session['id'] return session['id']
@ -484,6 +485,8 @@ class ActivityProcessor(object):
new_session = prev_session = None new_session = prev_session = None
prev_watched = None prev_watched = None
db = database.MonitorDatabase()
if session['live']: if session['live']:
# Check if we should group the session, select the last guid from the user within the last day # Check if we should group the session, select the last guid from the user within the last day
query = "SELECT session_history.id, session_history_metadata.guid, session_history.reference_id " \ query = "SELECT session_history.id, session_history_metadata.guid, session_history.reference_id " \
@ -495,7 +498,7 @@ class ActivityProcessor(object):
args = [last_id, session['user_id']] args = [last_id, session['user_id']]
result = self.db.select(query=query, args=args) result = db.select(query=query, args=args)
if len(result) > 0: if len(result) > 0:
new_session = {'id': last_id, new_session = {'id': last_id,
@ -515,7 +518,7 @@ class ActivityProcessor(object):
args = [last_id, session['user_id'], session['rating_key']] args = [last_id, session['user_id'], session['rating_key']]
result = self.db.select(query=query, args=args) result = db.select(query=query, args=args)
if len(result) > 1: if len(result) > 1:
new_session = {'id': result[0]['id'], new_session = {'id': result[0]['id'],
@ -558,9 +561,10 @@ class ActivityProcessor(object):
logger.debug("Tautulli ActivityProcessor :: Not grouping history for sessionKey %s", session['session_key']) logger.debug("Tautulli ActivityProcessor :: Not grouping history for sessionKey %s", session['session_key'])
args = [last_id, last_id] args = [last_id, last_id]
self.db.action(query=query, args=args) db.action(query=query, args=args)
def get_sessions(self, user_id=None, ip_address=None): def get_sessions(self, user_id=None, ip_address=None):
db = database.MonitorDatabase()
query = "SELECT * FROM sessions" query = "SELECT * FROM sessions"
args = [] args = []
@ -569,12 +573,13 @@ class ActivityProcessor(object):
query += " WHERE user_id = ?" + ip query += " WHERE user_id = ?" + ip
args.append(user_id) args.append(user_id)
sessions = self.db.select(query, args) sessions = db.select(query, args)
return sessions return sessions
def get_session_by_key(self, session_key=None): def get_session_by_key(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
session = self.db.select_single("SELECT * FROM sessions " session = db.select_single("SELECT * FROM sessions "
"WHERE session_key = ? ", "WHERE session_key = ? ",
args=[session_key]) args=[session_key])
if session: if session:
@ -583,8 +588,9 @@ class ActivityProcessor(object):
return None return None
def get_session_by_id(self, session_id=None): def get_session_by_id(self, session_id=None):
db = database.MonitorDatabase()
if session_id: if session_id:
session = self.db.select_single("SELECT * FROM sessions " session = db.select_single("SELECT * FROM sessions "
"WHERE session_id = ? ", "WHERE session_id = ? ",
args=[session_id]) args=[session_id])
if session: if session:
@ -593,6 +599,7 @@ class ActivityProcessor(object):
return None return None
def set_session_state(self, session_key=None, state=None, **kwargs): def set_session_state(self, session_key=None, state=None, **kwargs):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
values = {} values = {}
@ -603,21 +610,23 @@ class ActivityProcessor(object):
values[k] = v values[k] = v
keys = {'session_key': session_key} keys = {'session_key': session_key}
result = self.db.upsert('sessions', values, keys) result = db.upsert('sessions', values, keys)
return result return result
return None return None
def delete_session(self, session_key=None, row_id=None): def delete_session(self, session_key=None, row_id=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
self.db.action("DELETE FROM sessions WHERE session_key = ?", [session_key]) db.action("DELETE FROM sessions WHERE session_key = ?", [session_key])
elif str(row_id).isdigit(): elif str(row_id).isdigit():
self.db.action("DELETE FROM sessions WHERE id = ?", [row_id]) db.action("DELETE FROM sessions WHERE id = ?", [row_id])
def set_session_last_paused(self, session_key=None, timestamp=None): def set_session_last_paused(self, session_key=None, timestamp=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
result = self.db.select("SELECT last_paused, paused_counter " result = db.select("SELECT last_paused, paused_counter "
"FROM sessions " "FROM sessions "
"WHERE session_key = ?", args=[session_key]) "WHERE session_key = ?", args=[session_key])
@ -636,17 +645,19 @@ class ActivityProcessor(object):
values['paused_counter'] = paused_counter values['paused_counter'] = paused_counter
keys = {'session_key': session_key} keys = {'session_key': session_key}
self.db.upsert('sessions', values, keys) db.upsert('sessions', values, keys)
def increment_session_buffer_count(self, session_key=None): def increment_session_buffer_count(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
self.db.action("UPDATE sessions SET buffer_count = buffer_count + 1 " db.action("UPDATE sessions SET buffer_count = buffer_count + 1 "
"WHERE session_key = ?", "WHERE session_key = ?",
[session_key]) [session_key])
def get_session_buffer_count(self, session_key=None): def get_session_buffer_count(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
buffer_count = self.db.select_single("SELECT buffer_count " buffer_count = db.select_single("SELECT buffer_count "
"FROM sessions " "FROM sessions "
"WHERE session_key = ?", "WHERE session_key = ?",
[session_key]) [session_key])
@ -656,14 +667,16 @@ class ActivityProcessor(object):
return 0 return 0
def set_session_buffer_trigger_time(self, session_key=None): def set_session_buffer_trigger_time(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
self.db.action("UPDATE sessions SET buffer_last_triggered = strftime('%s', 'now') " db.action("UPDATE sessions SET buffer_last_triggered = strftime('%s', 'now') "
"WHERE session_key = ?", "WHERE session_key = ?",
[session_key]) [session_key])
def get_session_buffer_trigger_time(self, session_key=None): def get_session_buffer_trigger_time(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
last_time = self.db.select_single("SELECT buffer_last_triggered " last_time = db.select_single("SELECT buffer_last_triggered "
"FROM sessions " "FROM sessions "
"WHERE session_key = ?", "WHERE session_key = ?",
[session_key]) [session_key])
@ -673,37 +686,43 @@ class ActivityProcessor(object):
return None return None
def set_temp_stopped(self): def set_temp_stopped(self):
db = database.MonitorDatabase()
stopped_time = helpers.timestamp() stopped_time = helpers.timestamp()
self.db.action("UPDATE sessions SET stopped = ?", [stopped_time]) db.action("UPDATE sessions SET stopped = ?", [stopped_time])
def increment_write_attempts(self, session_key=None): def increment_write_attempts(self, session_key=None):
db = database.MonitorDatabase()
if str(session_key).isdigit(): if str(session_key).isdigit():
session = self.get_session_by_key(session_key=session_key) session = self.get_session_by_key(session_key=session_key)
self.db.action("UPDATE sessions SET write_attempts = ? WHERE session_key = ?", db.action("UPDATE sessions SET write_attempts = ? WHERE session_key = ?",
[session['write_attempts'] + 1, session_key]) [session['write_attempts'] + 1, session_key])
def set_marker(self, session_key=None, marker_idx=None, marker_type=None): def set_marker(self, session_key=None, marker_idx=None, marker_type=None):
db = database.MonitorDatabase()
marker_args = [ marker_args = [
int(marker_type == 'intro'), int(marker_type == 'intro'),
int(marker_type == 'commercial'), int(marker_type == 'commercial'),
int(marker_type == 'credits') int(marker_type == 'credits')
] ]
self.db.action("UPDATE sessions SET intro = ?, commercial = ?, credits = ?, marker = ? " db.action("UPDATE sessions SET intro = ?, commercial = ?, credits = ?, marker = ? "
"WHERE session_key = ?", "WHERE session_key = ?",
marker_args + [marker_idx, session_key]) marker_args + [marker_idx, session_key])
def set_watched(self, session_key=None): def set_watched(self, session_key=None):
self.db.action("UPDATE sessions SET watched = ? " db = database.MonitorDatabase()
db.action("UPDATE sessions SET watched = ? "
"WHERE session_key = ?", "WHERE session_key = ?",
[1, session_key]) [1, session_key])
def write_continued_session(self, user_id=None, machine_id=None, media_type=None, stopped=None): def write_continued_session(self, user_id=None, machine_id=None, media_type=None, stopped=None):
db = database.MonitorDatabase()
keys = {'user_id': user_id, 'machine_id': machine_id, 'media_type': media_type} keys = {'user_id': user_id, 'machine_id': machine_id, 'media_type': media_type}
values = {'stopped': stopped} values = {'stopped': stopped}
self.db.upsert(table_name='sessions_continued', key_dict=keys, value_dict=values) db.upsert(table_name='sessions_continued', key_dict=keys, value_dict=values)
def is_initial_stream(self, user_id=None, machine_id=None, media_type=None, started=None): def is_initial_stream(self, user_id=None, machine_id=None, media_type=None, started=None):
last_session = self.db.select_single("SELECT stopped " db = database.MonitorDatabase()
last_session = db.select_single("SELECT stopped "
"FROM sessions_continued " "FROM sessions_continued "
"WHERE user_id = ? AND machine_id = ? AND media_type = ? " "WHERE user_id = ? AND machine_id = ? AND media_type = ? "
"ORDER BY stopped DESC", "ORDER BY stopped DESC",
@ -717,11 +736,12 @@ class ActivityProcessor(object):
logger.info("Tautulli ActivityProcessor :: Regrouping session history...") logger.info("Tautulli ActivityProcessor :: Regrouping session history...")
db = database.MonitorDatabase()
query = ( query = (
"SELECT * FROM session_history " "SELECT * FROM session_history "
"JOIN session_history_metadata ON session_history.id = session_history_metadata.id" "JOIN session_history_metadata ON session_history.id = session_history_metadata.id"
) )
results = self.db.select(query) results = db.select(query)
count = len(results) count = len(results)
progress = 0 progress = 0