mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-06 21:21:15 -07:00
Refactor datatables static methods
This commit is contained in:
parent
b32b1bae7c
commit
9b4f4b8ae4
1 changed files with 162 additions and 151 deletions
|
@ -63,22 +63,22 @@ class DataTables(object):
|
|||
'named json_data.')
|
||||
return None
|
||||
|
||||
extracted_columns = self.extract_columns(columns=columns)
|
||||
join = self.build_join(join_types, join_tables, join_evals)
|
||||
group = self.build_grouping(group_by)
|
||||
c_where, cw_args = self.build_custom_where(custom_where)
|
||||
order = self.build_order(parameters['order'],
|
||||
extracted_columns = extract_columns(columns=columns)
|
||||
join = build_join(join_types, join_tables, join_evals)
|
||||
group = build_grouping(group_by)
|
||||
c_where, cw_args = build_custom_where(custom_where)
|
||||
order = build_order(parameters['order'],
|
||||
extracted_columns['column_named'],
|
||||
parameters['columns'])
|
||||
where, w_args = self.build_where(parameters['search']['value'],
|
||||
where, w_args = build_where(parameters['search']['value'],
|
||||
extracted_columns['column_named'],
|
||||
parameters['columns'])
|
||||
|
||||
# Build union parameters
|
||||
if table_name_union:
|
||||
extracted_columns_union = self.extract_columns(columns=columns_union)
|
||||
group_u = self.build_grouping(group_by_union)
|
||||
c_where_u, cwu_args = self.build_custom_where(custom_where_union)
|
||||
extracted_columns_union = extract_columns(columns=columns_union)
|
||||
group_u = build_grouping(group_by_union)
|
||||
c_where_u, cwu_args = build_custom_where(custom_where_union)
|
||||
union = 'UNION SELECT %s FROM %s %s %s' % (extracted_columns_union['column_string'],
|
||||
table_name_union,
|
||||
c_where_u,
|
||||
|
@ -117,172 +117,183 @@ class DataTables(object):
|
|||
|
||||
return output
|
||||
|
||||
def build_grouping(self, group_by=[]):
|
||||
# Build grouping
|
||||
group = ''
|
||||
|
||||
for g in group_by:
|
||||
group += g + ', '
|
||||
if group:
|
||||
group = 'GROUP BY ' + group.rstrip(', ')
|
||||
def build_grouping(group_by=[]):
|
||||
# Build grouping
|
||||
group = ''
|
||||
|
||||
return group
|
||||
for g in group_by:
|
||||
group += g + ', '
|
||||
if group:
|
||||
group = 'GROUP BY ' + group.rstrip(', ')
|
||||
|
||||
def build_join(self, join_types=[], join_tables=[], join_evals=[]):
|
||||
# Build join parameters
|
||||
join = ''
|
||||
return group
|
||||
|
||||
for i, join_type in enumerate(join_types):
|
||||
if join_type.upper() == 'LEFT OUTER JOIN':
|
||||
join += 'LEFT OUTER JOIN %s ON %s = %s ' % (join_tables[i], join_evals[i][0], join_evals[i][1])
|
||||
elif join_type.upper() == 'JOIN' or join_type.upper() == 'INNER JOIN':
|
||||
join += 'JOIN %s ON %s = %s ' % (join_tables[i], join_evals[i][0], join_evals[i][1])
|
||||
|
||||
return join
|
||||
def build_join(join_types=[], join_tables=[], join_evals=[]):
|
||||
# Build join parameters
|
||||
join = ''
|
||||
|
||||
def build_custom_where(self, custom_where=[]):
|
||||
# Build custom where parameters
|
||||
c_where = ''
|
||||
args = []
|
||||
for i, join_type in enumerate(join_types):
|
||||
if join_type.upper() == 'LEFT OUTER JOIN':
|
||||
join += 'LEFT OUTER JOIN %s ON %s = %s ' % (join_tables[i], join_evals[i][0], join_evals[i][1])
|
||||
elif join_type.upper() == 'JOIN' or join_type.upper() == 'INNER JOIN':
|
||||
join += 'JOIN %s ON %s = %s ' % (join_tables[i], join_evals[i][0], join_evals[i][1])
|
||||
|
||||
for w in custom_where:
|
||||
if isinstance(w[1], (list, tuple)) and len(w[1]):
|
||||
c_where += '('
|
||||
for w_ in w[1]:
|
||||
if w_ == None:
|
||||
c_where += w[0] + ' IS NULL OR '
|
||||
elif str(w_).startswith('LIKE '):
|
||||
c_where += w[0] + ' LIKE ? OR '
|
||||
args.append(w_[5:])
|
||||
else:
|
||||
c_where += w[0] + ' = ? OR '
|
||||
args.append(w_)
|
||||
c_where = c_where.rstrip(' OR ') + ') AND '
|
||||
else:
|
||||
if w[1] == None:
|
||||
c_where += w[0] + ' IS NULL AND '
|
||||
elif str(w[1]).startswith('LIKE '):
|
||||
c_where += w[0] + ' LIKE ? AND '
|
||||
args.append(w[1][5:])
|
||||
return join
|
||||
|
||||
|
||||
def build_custom_where(custom_where=[]):
|
||||
# Build custom where parameters
|
||||
c_where = ''
|
||||
args = []
|
||||
|
||||
for w in custom_where:
|
||||
and_or = ' OR ' if w[0].endswith('OR') else ' AND '
|
||||
w[0] = w[0].rstrip(' OR')
|
||||
|
||||
if isinstance(w[1], (list, tuple)) and len(w[1]):
|
||||
c_where += '('
|
||||
for w_ in w[1]:
|
||||
if w_ is None:
|
||||
c_where += w[0] + ' IS NULL'
|
||||
elif str(w_).startswith('LIKE '):
|
||||
c_where += w[0] + ' LIKE ?'
|
||||
args.append(w_[5:])
|
||||
else:
|
||||
c_where += w[0] + ' = ? AND '
|
||||
args.append(w[1])
|
||||
|
||||
if c_where:
|
||||
c_where = 'WHERE ' + c_where.rstrip(' AND ')
|
||||
|
||||
return c_where, args
|
||||
|
||||
def build_order(self, order_param=[], columns=[], dt_columns=[]):
|
||||
# Build ordering
|
||||
order = ''
|
||||
|
||||
for o in order_param:
|
||||
sort_order = ' COLLATE NOCASE'
|
||||
if o['dir'] == 'desc':
|
||||
sort_order += ' DESC'
|
||||
# We first see if a name was sent though for the column sort.
|
||||
if dt_columns[int(o['column'])]['data']:
|
||||
# We have a name, now check if it's a valid column name for our query
|
||||
# so we don't just inject a random value
|
||||
if any(d.lower() == dt_columns[int(o['column'])]['data'].lower()
|
||||
for d in columns):
|
||||
order += dt_columns[int(o['column'])]['data'] + '%s, ' % sort_order
|
||||
else:
|
||||
# if we receive a bogus name, rather not sort at all.
|
||||
pass
|
||||
# If no name exists for the column, just use the column index to sort
|
||||
c_where += w[0] + ' = ?'
|
||||
args.append(w_)
|
||||
c_where += ' OR '
|
||||
c_where = c_where.rstrip(' OR ') + ')' + and_or
|
||||
else:
|
||||
if w[1] is None:
|
||||
c_where += w[0] + ' IS NULL'
|
||||
elif str(w[1]).startswith('LIKE '):
|
||||
c_where += w[0] + ' LIKE ?'
|
||||
args.append(w[1][5:])
|
||||
else:
|
||||
order += columns[int(o['column'])] + ', '
|
||||
c_where += w[0] + ' = ?'
|
||||
args.append(w[1])
|
||||
|
||||
if order:
|
||||
order = 'ORDER BY ' + order.rstrip(', ')
|
||||
c_where += and_or
|
||||
|
||||
return order
|
||||
if c_where:
|
||||
c_where = 'WHERE ' + c_where.rstrip(' AND ').rstrip(' OR ')
|
||||
|
||||
def build_where(self, search_param='', columns=[], dt_columns=[]):
|
||||
# Build where parameters
|
||||
where = ''
|
||||
args = []
|
||||
return c_where, args
|
||||
|
||||
if search_param:
|
||||
for i, s in enumerate(dt_columns):
|
||||
if s['searchable']:
|
||||
# We first see if a name was sent though for the column search.
|
||||
if s['data']:
|
||||
# We have a name, now check if it's a valid column name for our query
|
||||
# so we don't just inject a random value
|
||||
if any(d.lower() == s['data'].lower() for d in columns):
|
||||
where += s['data'] + ' LIKE ? OR '
|
||||
args.append('%' + search_param + '%')
|
||||
else:
|
||||
# if we receive a bogus name, rather not search at all.
|
||||
pass
|
||||
# If no name exists for the column, just use the column index to search
|
||||
else:
|
||||
where += columns[i] + ' LIKE ? OR '
|
||||
|
||||
def build_order(order_param=[], columns=[], dt_columns=[]):
|
||||
# Build ordering
|
||||
order = ''
|
||||
|
||||
for o in order_param:
|
||||
sort_order = ' COLLATE NOCASE'
|
||||
if o['dir'] == 'desc':
|
||||
sort_order += ' DESC'
|
||||
# We first see if a name was sent though for the column sort.
|
||||
if dt_columns[int(o['column'])]['data']:
|
||||
# We have a name, now check if it's a valid column name for our query
|
||||
# so we don't just inject a random value
|
||||
if any(d.lower() == dt_columns[int(o['column'])]['data'].lower()
|
||||
for d in columns):
|
||||
order += dt_columns[int(o['column'])]['data'] + '%s, ' % sort_order
|
||||
else:
|
||||
# if we receive a bogus name, rather not sort at all.
|
||||
pass
|
||||
# If no name exists for the column, just use the column index to sort
|
||||
else:
|
||||
order += columns[int(o['column'])] + ', '
|
||||
|
||||
if order:
|
||||
order = 'ORDER BY ' + order.rstrip(', ')
|
||||
|
||||
return order
|
||||
|
||||
|
||||
def build_where(search_param='', columns=[], dt_columns=[]):
|
||||
# Build where parameters
|
||||
where = ''
|
||||
args = []
|
||||
|
||||
if search_param:
|
||||
for i, s in enumerate(dt_columns):
|
||||
if s['searchable']:
|
||||
# We first see if a name was sent though for the column search.
|
||||
if s['data']:
|
||||
# We have a name, now check if it's a valid column name for our query
|
||||
# so we don't just inject a random value
|
||||
if any(d.lower() == s['data'].lower() for d in columns):
|
||||
where += s['data'] + ' LIKE ? OR '
|
||||
args.append('%' + search_param + '%')
|
||||
if where:
|
||||
where = 'WHERE ' + where.rstrip(' OR ')
|
||||
|
||||
return where, args
|
||||
|
||||
# This method extracts column data from our column list
|
||||
# The first parameter is required, the match_columns parameter is optional and will cause the function to
|
||||
# only return results if the value also exists in the match_columns 'data' field
|
||||
@staticmethod
|
||||
def extract_columns(columns=None, match_columns=None):
|
||||
columns_string = ''
|
||||
columns_literal = []
|
||||
columns_named = []
|
||||
columns_order = []
|
||||
|
||||
for column in columns:
|
||||
# We allow using "as" in column names for more complex sql functions.
|
||||
# This function breaks up the column to get all it's parts.
|
||||
as_search = re.compile(' as ', re.IGNORECASE)
|
||||
|
||||
if re.search(as_search, column):
|
||||
column_named = re.split(as_search, column)[1].rpartition('.')[-1]
|
||||
column_literal = re.split(as_search, column)[0]
|
||||
column_order = re.split(as_search, column)[1]
|
||||
if match_columns:
|
||||
if any(d['data'].lower() == column_named.lower() for d in match_columns):
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column_literal)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column_order)
|
||||
else:
|
||||
# if we receive a bogus name, rather not search at all.
|
||||
pass
|
||||
# If no name exists for the column, just use the column index to search
|
||||
else:
|
||||
where += columns[i] + ' LIKE ? OR '
|
||||
args.append('%' + search_param + '%')
|
||||
if where:
|
||||
where = 'WHERE ' + where.rstrip(' OR ')
|
||||
|
||||
return where, args
|
||||
|
||||
|
||||
# This method extracts column data from our column list
|
||||
# The first parameter is required, the match_columns parameter is optional and will cause the function to
|
||||
# only return results if the value also exists in the match_columns 'data' field
|
||||
def extract_columns(columns=None, match_columns=None):
|
||||
columns_string = ''
|
||||
columns_literal = []
|
||||
columns_named = []
|
||||
columns_order = []
|
||||
|
||||
for column in columns:
|
||||
# We allow using "as" in column names for more complex sql functions.
|
||||
# This function breaks up the column to get all it's parts.
|
||||
as_search = re.compile(' as ', re.IGNORECASE)
|
||||
|
||||
if re.search(as_search, column):
|
||||
column_named = re.split(as_search, column)[1].rpartition('.')[-1]
|
||||
column_literal = re.split(as_search, column)[0]
|
||||
column_order = re.split(as_search, column)[1]
|
||||
if match_columns:
|
||||
if any(d['data'].lower() == column_named.lower() for d in match_columns):
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column_literal)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column_order)
|
||||
else:
|
||||
column_named = column.rpartition('.')[-1]
|
||||
if match_columns:
|
||||
if any(d['data'].lower() == column_named.lower() for d in match_columns):
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column)
|
||||
else:
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column_literal)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column_order)
|
||||
else:
|
||||
column_named = column.rpartition('.')[-1]
|
||||
if match_columns:
|
||||
if any(d['data'].lower() == column_named.lower() for d in match_columns):
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column)
|
||||
else:
|
||||
columns_string += column + ', '
|
||||
columns_literal.append(column)
|
||||
columns_named.append(column_named)
|
||||
columns_order.append(column)
|
||||
|
||||
columns_string = columns_string.rstrip(', ')
|
||||
columns_string = columns_string.rstrip(', ')
|
||||
|
||||
# We return a dict of the column params
|
||||
# column_string is a comma seperated list of the exact column variables received.
|
||||
# column_literal is the text before the "as" if we have an "as". Usually a function.
|
||||
# column_named is the text after the "as", if we have an "as". Any table prefix is also stripped off.
|
||||
# We use this to match with columns received from the Datatables request.
|
||||
# column_order is the text after the "as", if we have an "as". Any table prefix is left intact.
|
||||
column_data = {'column_string': columns_string,
|
||||
'column_literal': columns_literal,
|
||||
'column_named': columns_named,
|
||||
'column_order': columns_order
|
||||
}
|
||||
# We return a dict of the column params
|
||||
# column_string is a comma seperated list of the exact column variables received.
|
||||
# column_literal is the text before the "as" if we have an "as". Usually a function.
|
||||
# column_named is the text after the "as", if we have an "as". Any table prefix is also stripped off.
|
||||
# We use this to match with columns received from the Datatables request.
|
||||
# column_order is the text after the "as", if we have an "as". Any table prefix is left intact.
|
||||
column_data = {'column_string': columns_string,
|
||||
'column_literal': columns_literal,
|
||||
'column_named': columns_named,
|
||||
'column_order': columns_order
|
||||
}
|
||||
|
||||
return column_data
|
||||
return column_data
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue