# This file is part of Tautulli. # # Tautulli is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # # Tautulli is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Tautulli. If not, see . import base64 import datetime from functools import wraps import geoip2.database, geoip2.errors import gzip import hashlib import imghdr import ipwhois, ipwhois.exceptions, ipwhois.utils from IPy import IP import json import math import maxminddb from operator import itemgetter import os import re import socket import sys import time import unicodedata import urllib, urllib2 from xml.dom import minidom import xmltodict import plexpy import logger import request from plexpy.api2 import API2 def addtoapi(*dargs, **dkwargs): """ Helper decorator that adds function to the API class. is used to reuse as much code as possible args: dargs: (string, optional) Used to rename a function Example: @addtoapi("i_was_renamed", "im_a_second_alias") @addtoapi() """ def rd(function): @wraps(function) def wrapper(*args, **kwargs): return function(*args, **kwargs) if dargs: # To rename the function if it sucks.. and # allow compat with old api. for n in dargs: if function.__doc__ and len(function.__doc__): function.__doc__ = function.__doc__.strip() setattr(API2, n, function) return wrapper if function.__doc__ and len(function.__doc__): function.__doc__ = function.__doc__.strip() setattr(API2, function.__name__, function) return wrapper return rd def multikeysort(items, columns): comparers = [((itemgetter(col[1:].strip()), -1) if col.startswith('-') else (itemgetter(col.strip()), 1)) for col in columns] def comparer(left, right): for fn, mult in comparers: result = cmp(fn(left), fn(right)) if result: return mult * result else: return 0 return sorted(items, cmp=comparer) def checked(variable): if variable: return 'Checked' else: return '' def radio(variable, pos): if variable == pos: return 'Checked' else: return '' def latinToAscii(unicrap): """ From couch potato """ xlate = { 0xc0: 'A', 0xc1: 'A', 0xc2: 'A', 0xc3: 'A', 0xc4: 'A', 0xc5: 'A', 0xc6: 'Ae', 0xc7: 'C', 0xc8: 'E', 0xc9: 'E', 0xca: 'E', 0xcb: 'E', 0x86: 'e', 0xcc: 'I', 0xcd: 'I', 0xce: 'I', 0xcf: 'I', 0xd0: 'Th', 0xd1: 'N', 0xd2: 'O', 0xd3: 'O', 0xd4: 'O', 0xd5: 'O', 0xd6: 'O', 0xd8: 'O', 0xd9: 'U', 0xda: 'U', 0xdb: 'U', 0xdc: 'U', 0xdd: 'Y', 0xde: 'th', 0xdf: 'ss', 0xe0: 'a', 0xe1: 'a', 0xe2: 'a', 0xe3: 'a', 0xe4: 'a', 0xe5: 'a', 0xe6: 'ae', 0xe7: 'c', 0xe8: 'e', 0xe9: 'e', 0xea: 'e', 0xeb: 'e', 0x0259: 'e', 0xec: 'i', 0xed: 'i', 0xee: 'i', 0xef: 'i', 0xf0: 'th', 0xf1: 'n', 0xf2: 'o', 0xf3: 'o', 0xf4: 'o', 0xf5: 'o', 0xf6: 'o', 0xf8: 'o', 0xf9: 'u', 0xfa: 'u', 0xfb: 'u', 0xfc: 'u', 0xfd: 'y', 0xfe: 'th', 0xff: 'y', 0xa1: '!', 0xa2: '{cent}', 0xa3: '{pound}', 0xa4: '{currency}', 0xa5: '{yen}', 0xa6: '|', 0xa7: '{section}', 0xa8: '{umlaut}', 0xa9: '{C}', 0xaa: '{^a}', 0xab: '<<', 0xac: '{not}', 0xad: '-', 0xae: '{R}', 0xaf: '_', 0xb0: '{degrees}', 0xb1: '{+/-}', 0xb2: '{^2}', 0xb3: '{^3}', 0xb4: "'", 0xb5: '{micro}', 0xb6: '{paragraph}', 0xb7: '*', 0xb8: '{cedilla}', 0xb9: '{^1}', 0xba: '{^o}', 0xbb: '>>', 0xbc: '{1/4}', 0xbd: '{1/2}', 0xbe: '{3/4}', 0xbf: '?', 0xd7: '*', 0xf7: '/' } r = '' if unicrap: for i in unicrap: if ord(i) in xlate: r += xlate[ord(i)] elif ord(i) >= 0x80: pass else: r += str(i) return r def convert_milliseconds(ms): seconds = ms / 1000 gmtime = time.gmtime(seconds) if seconds > 3600: minutes = time.strftime("%H:%M:%S", gmtime) else: minutes = time.strftime("%M:%S", gmtime) return minutes def convert_milliseconds_to_minutes(ms): if str(ms).isdigit(): seconds = float(ms) / 1000 minutes = round(seconds / 60, 0) return math.trunc(minutes) return 0 def convert_seconds(s): gmtime = time.gmtime(s) if s > 3600: minutes = time.strftime("%H:%M:%S", gmtime) else: minutes = time.strftime("%M:%S", gmtime) return minutes def convert_seconds_to_minutes(s): if str(s).isdigit(): minutes = round(float(s) / 60, 0) return math.trunc(minutes) return 0 def today(): today = datetime.date.today() yyyymmdd = datetime.date.isoformat(today) return yyyymmdd def now(): now = datetime.datetime.now() return now.strftime("%Y-%m-%d %H:%M:%S") def human_duration(s, sig='dhms'): hd = '' if str(s).isdigit() and s > 0: d = int(s / 86400) h = int((s % 86400) / 3600) m = int(((s % 86400) % 3600) / 60) s = int(((s % 86400) % 3600) % 60) hd_list = [] if sig >= 'd' and d > 0: d = d + 1 if sig == 'd' and h >= 12 else d hd_list.append(str(d) + ' days') if sig >= 'dh' and h > 0: h = h + 1 if sig == 'dh' and m >= 30 else h hd_list.append(str(h) + ' hrs') if sig >= 'dhm' and m > 0: m = m + 1 if sig == 'dhm' and s >= 30 else m hd_list.append(str(m) + ' mins') if sig >= 'dhms' and s > 0: hd_list.append(str(s) + ' secs') hd = ' '.join(hd_list) else: hd = '0' return hd def get_age(date): try: split_date = date.split('-') except: return False try: days_old = int(split_date[0]) * 365 + int(split_date[1]) * 30 + int(split_date[2]) except IndexError: days_old = False return days_old def bytes_to_mb(bytes): mb = int(bytes) / 1048576 size = '%.1f MB' % mb return size def mb_to_bytes(mb_str): result = re.search('^(\d+(?:\.\d+)?)\s?(?:mb)?', mb_str, flags=re.I) if result: return int(float(result.group(1)) * 1048576) def piratesize(size): split = size.split(" ") factor = float(split[0]) unit = split[1].upper() if unit == 'MiB': size = factor * 1048576 elif unit == 'MB': size = factor * 1000000 elif unit == 'GiB': size = factor * 1073741824 elif unit == 'GB': size = factor * 1000000000 elif unit == 'KiB': size = factor * 1024 elif unit == 'KB': size = factor * 1000 elif unit == "B": size = factor else: size = 0 return size def replace_all(text, dic, normalize=False): if not text: return '' for i, j in dic.iteritems(): if normalize: try: if sys.platform == 'darwin': j = unicodedata.normalize('NFD', j) else: j = unicodedata.normalize('NFC', j) except TypeError: j = unicodedata.normalize('NFC', j.decode(plexpy.SYS_ENCODING, 'replace')) text = text.replace(i, j) return text def replace_illegal_chars(string, type="file"): if type == "file": string = re.sub('[\?"*:|<>/]', '_', string) if type == "folder": string = re.sub('[:\?<>"|]', '_', string) return string def cleanName(string): pass1 = latinToAscii(string).lower() out_string = re.sub('[\.\-\/\!\@\#\$\%\^\&\*\(\)\+\-\"\'\,\;\:\[\]\{\}\<\>\=\_]', '', pass1).encode('utf-8') return out_string def cleanTitle(title): title = re.sub('[\.\-\/\_]', ' ', title).lower() # Strip out extra whitespace title = ' '.join(title.split()) title = title.title() return title def split_path(f): """ Split a path into components, starting with the drive letter (if any). Given a path, os.path.join(*split_path(f)) should be path equal to f. """ components = [] drive, path = os.path.splitdrive(f) # Strip the folder from the path, iterate until nothing is left while True: path, folder = os.path.split(path) if folder: components.append(folder) else: if path: components.append(path) break # Append the drive (if any) if drive: components.append(drive) # Reverse components components.reverse() # Done return components def extract_logline(s): # Default log format pattern = re.compile(r'(?P.*?)\s\-\s(?P.*?)\s*\:\:\s(?P.*?)\s\:\s(?P.*)', re.VERBOSE) match = pattern.match(s) if match: timestamp = match.group("timestamp") level = match.group("level") thread = match.group("thread") message = match.group("message") return (timestamp, level, thread, message) else: return None def split_string(mystring, splitvar=','): mylist = [] for each_word in mystring.split(splitvar): mylist.append(each_word.strip()) return mylist def create_https_certificates(ssl_cert, ssl_key): """ Create a self-signed HTTPS certificate and store in it in 'ssl_cert' and 'ssl_key'. Method assumes pyOpenSSL is installed. This code is stolen from SickBeard (http://github.com/midgetspy/Sick-Beard). """ from OpenSSL import crypto from certgen import createKeyPair, createSelfSignedCertificate, TYPE_RSA serial = int(time.time()) domains = ['DNS:' + d.strip() for d in plexpy.CONFIG.HTTPS_DOMAIN.split(',') if d] ips = ['IP:' + d.strip() for d in plexpy.CONFIG.HTTPS_IP.split(',') if d] altNames = ','.join(domains + ips) # Create the self-signed Tautulli certificate logger.debug(u"Generating self-signed SSL certificate.") pkey = createKeyPair(TYPE_RSA, 2048) cert = createSelfSignedCertificate(("Tautulli", pkey), serial, (0, 60 * 60 * 24 * 365 * 10), altNames) # ten years # Save the key and certificate to disk try: with open(ssl_cert, "w") as fp: fp.write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert)) with open(ssl_key, "w") as fp: fp.write(crypto.dump_privatekey(crypto.FILETYPE_PEM, pkey)) except IOError as e: logger.error("Error creating SSL key and certificate: %s", e) return False return True def cast_to_int(s): try: return int(s) except (ValueError, TypeError): return 0 def cast_to_float(s): try: return float(s) except (ValueError, TypeError): return 0 def convert_xml_to_json(xml): o = xmltodict.parse(xml) return json.dumps(o) def convert_xml_to_dict(xml): o = xmltodict.parse(xml) return o def get_percent(value1, value2): value1 = cast_to_float(value1) value2 = cast_to_float(value2) if value1 != 0 and value2 != 0: percent = (value1 / value2) * 100 else: percent = 0 return math.trunc(percent) def hex_to_int(hex): try: return int(hex, 16) except (ValueError, TypeError): return 0 def parse_xml(unparsed=None): if unparsed: try: xml_parse = minidom.parseString(unparsed) return xml_parse except Exception as e: logger.warn("Error parsing XML. %s" % e) return [] except: logger.warn("Error parsing XML.") return [] else: logger.warn("XML parse request made but no data received.") return [] """ Validate xml keys to make sure they exist and return their attribute value, return blank value is none found """ def get_xml_attr(xml_key, attribute, return_bool=False, default_return=''): if xml_key.getAttribute(attribute): if return_bool: return True else: return xml_key.getAttribute(attribute) else: if return_bool: return False else: return default_return def process_json_kwargs(json_kwargs): params = {} if json_kwargs: params = json.loads(json_kwargs) return params def sanitize(string): if string: return unicode(string).replace('<','<').replace('>','>') else: return '' def is_public_ip(host): ip = is_valid_ip(get_ip(host)) if ip and ip.iptype() == 'PUBLIC': return True return False def get_ip(host): ip_address = '' if is_valid_ip(host): return host else: try: ip_address = socket.getaddrinfo(host, None)[0][4][0] logger.debug(u"IP Checker :: Resolved %s to %s." % (host, ip_address)) except: logger.error(u"IP Checker :: Bad IP or hostname provided.") return ip_address def is_valid_ip(address): try: return IP(address) except TypeError: return False except ValueError: return False def install_geoip_db(): maxmind_url = 'http://geolite.maxmind.com/download/geoip/database/' geolite2_gz = 'GeoLite2-City.mmdb.gz' geolite2_md5 = 'GeoLite2-City.md5' geolite2_db = geolite2_gz[:-3] md5_checksum = '' temp_gz = os.path.join(plexpy.CONFIG.CACHE_DIR, geolite2_gz) geolite2_db = plexpy.CONFIG.GEOIP_DB or os.path.join(plexpy.DATA_DIR, geolite2_db) # Retrieve the GeoLite2 gzip file logger.debug(u"Tautulli Helpers :: Downloading GeoLite2 gzip file from MaxMind...") try: maxmind = urllib.URLopener() maxmind.retrieve(maxmind_url + geolite2_gz, temp_gz) md5_checksum = urllib2.urlopen(maxmind_url + geolite2_md5).read() except Exception as e: logger.error(u"Tautulli Helpers :: Failed to download GeoLite2 gzip file from MaxMind: %s" % e) return False # Extract the GeoLite2 database file logger.debug(u"Tautulli Helpers :: Extracting GeoLite2 database...") try: with gzip.open(temp_gz, 'rb') as gz: with open(geolite2_db, 'wb') as db: db.write(gz.read()) except Exception as e: logger.error(u"Tautulli Helpers :: Failed to extract the GeoLite2 database: %s" % e) return False # Check MD5 hash for GeoLite2 database file logger.debug(u"Tautulli Helpers :: Checking MD5 checksum for GeoLite2 database...") try: hash_md5 = hashlib.md5() with open(geolite2_db, 'rb') as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) md5_hash = hash_md5.hexdigest() if md5_hash != md5_checksum: logger.error(u"Tautulli Helpers :: MD5 checksum doesn't match for GeoLite2 database. " "Checksum: %s, file hash: %s" % (md5_checksum, md5_hash)) return False except Exception as e: logger.error(u"Tautulli Helpers :: Failed to generate MD5 checksum for GeoLite2 database: %s" % e) return False # Delete temportary GeoLite2 gzip file logger.debug(u"Tautulli Helpers :: Deleting temporary GeoLite2 gzip file...") try: os.remove(temp_gz) except Exception as e: logger.warn(u"Tautulli Helpers :: Failed to remove temporary GeoLite2 gzip file: %s" % e) logger.debug(u"Tautulli Helpers :: GeoLite2 database installed successfully.") plexpy.CONFIG.__setattr__('GEOIP_DB', geolite2_db) plexpy.CONFIG.write() return True def uninstall_geoip_db(): logger.debug(u"Tautulli Helpers :: Uninstalling the GeoLite2 database...") try: os.remove(plexpy.CONFIG.GEOIP_DB) plexpy.CONFIG.__setattr__('GEOIP_DB', '') plexpy.CONFIG.write() except Exception as e: logger.error(u"Tautulli Helpers :: Failed to uninstall the GeoLite2 database: %s" % e) return False logger.debug(u"Tautulli Helpers :: GeoLite2 database uninstalled successfully.") return True def geoip_lookup(ip_address): if not plexpy.CONFIG.GEOIP_DB: return 'GeoLite2 database not installed. Please install from the ' \ 'Settings page.' if not ip_address: return 'No IP address provided.' try: reader = geoip2.database.Reader(plexpy.CONFIG.GEOIP_DB) geo = reader.city(ip_address) reader.close() except ValueError as e: return 'Invalid IP address provided: %s.' % ip_address except IOError as e: return 'Missing GeoLite2 database. Please reinstall from the ' \ 'Settings page.' except maxminddb.InvalidDatabaseError as e: return 'Invalid GeoLite2 database. Please reinstall from the ' \ 'Settings page.' except geoip2.errors.AddressNotFoundError as e: return '%s' % e except Exception as e: return 'Error: %s' % e geo_info = {'continent': geo.continent.name, 'country': geo.country.name, 'region': geo.subdivisions.most_specific.name, 'city': geo.city.name, 'postal_code': geo.postal.code, 'timezone': geo.location.time_zone, 'latitude': geo.location.latitude, 'longitude': geo.location.longitude, 'accuracy': geo.location.accuracy_radius } return geo_info def whois_lookup(ip_address): nets = [] err = None try: whois = ipwhois.IPWhois(ip_address).lookup_whois(retry_count=0) countries = ipwhois.utils.get_countries() nets = whois['nets'] for net in nets: net['country'] = countries.get(net['country']) if net['postal_code']: net['postal_code'] = net['postal_code'].replace('-', ' ') except ValueError as e: err = 'Invalid IP address provided: %s.' % ip_address except ipwhois.exceptions.IPDefinedError as e: err = '%s' % e except ipwhois.exceptions.ASNRegistryError as e: err = '%s' % e except Exception as e: err = 'Error: %s' % e host = '' try: host = ipwhois.Net(ip_address).get_host(retry_count=0)[0] except Exception as e: host = 'Not available' whois_info = {"host": host, "nets": nets } if err: whois_info['error'] = err return whois_info # Taken from SickRage def anon_url(*url): """ Return a URL string consisting of the Anonymous redirect URL and an arbitrary number of values appended. """ return '' if None in url else '%s%s' % (plexpy.CONFIG.ANON_REDIRECT, ''.join(str(s) for s in url)) def upload_to_imgur(imgPath, imgTitle=''): """ Uploads an image to Imgur """ client_id = plexpy.CONFIG.IMGUR_CLIENT_ID img_url = delete_hash = '' if not client_id: logger.error(u"Tautulli Helpers :: Cannot upload poster to Imgur. No Imgur client id specified in the settings.") return img_url, delete_hash try: with open(imgPath, 'rb') as imgFile: img = imgFile.read() except IOError as e: logger.error(u"Tautulli Helpers :: Unable to read image file for Imgur: %s" % e) return img_url, delete_hash headers = {'Authorization': 'Client-ID %s' % client_id} data = {'type': 'base64', 'image': base64.b64encode(img)} if imgTitle: data['title'] = imgTitle.encode('utf-8') data['name'] = imgTitle.encode('utf-8') + '.jpg' response, err_msg, req_msg = request.request_response2('https://api.imgur.com/3/image', 'POST', headers=headers, data=data) if response and not err_msg: t = '\'' + imgTitle + '\' ' if imgTitle else '' logger.debug(u"Tautulli Helpers :: Image {}uploaded to Imgur.".format(t)) imgur_response_data = response.json().get('data') img_url = imgur_response_data.get('link', '').replace('http://', 'https://') delete_hash = imgur_response_data.get('deletehash', '') else: if err_msg: logger.error(u"Tautulli Helpers :: Unable to upload image to Imgur: {}".format(err_msg)) else: logger.error(u"Tautulli Helpers :: Unable to upload image to Imgur.") if req_msg: logger.debug(u"Tautulli Helpers :: Request response: {}".format(req_msg)) return img_url, delete_hash def delete_from_imgur(delete_hash, imgTitle=''): """ Deletes an image from Imgur """ client_id = plexpy.CONFIG.IMGUR_CLIENT_ID headers = {'Authorization': 'Client-ID %s' % client_id} response, err_msg, req_msg = request.request_response2('https://api.imgur.com/3/image/%s' % delete_hash, 'DELETE', headers=headers) if response and not err_msg: t = '\'' + imgTitle + '\' ' if imgTitle else '' logger.debug(u"Tautulli Helpers :: Image {}deleted from Imgur.".format(t)) return True else: if err_msg: logger.error(u"Tautulli Helpers :: Unable to delete image from Imgur: {}".format(err_msg)) else: logger.error(u"Tautulli Helpers :: Unable to delete image from Imgur.") return False def cache_image(url, image=None): """ Saves an image to the cache directory. If no image is provided, tries to return the image from the cache directory. """ # Create image directory if it doesn't exist imgdir = os.path.join(plexpy.CONFIG.CACHE_DIR, 'images/') if not os.path.exists(imgdir): logger.debug(u"Tautulli Helpers :: Creating image cache directory at %s" % imgdir) os.makedirs(imgdir) # Create a hash of the url to use as the filename imghash = hashlib.md5(url).hexdigest() imagefile = os.path.join(imgdir, imghash) # If an image is provided, save it to the cache directory if image: try: with open(imagefile, 'wb') as cache_file: cache_file.write(image) except IOError as e: logger.error(u"Tautulli Helpers :: Failed to cache image %s: %s" % (imagefile, e)) # Try to return the image from the cache directory if os.path.isfile(imagefile): imagetype = 'image/' + imghdr.what(os.path.abspath(imagefile)) else: imagefile = None imagetype = 'image/jpeg' return imagefile, imagetype def build_datatables_json(kwargs, dt_columns, default_sort_col=None): """ Builds datatables json data dt_columns: list of tuples [("column name", "orderable", "searchable"), ...] """ columns = [{"data": c[0], "orderable": c[1], "searchable": c[2]} for c in dt_columns] if not default_sort_col: default_sort_col = dt_columns[0][0] order_column = [c[0] for c in dt_columns].index(kwargs.pop("order_column", default_sort_col)) # Build json data json_data = {"draw": 1, "columns": columns, "order": [{"column": order_column, "dir": kwargs.pop("order_dir", "desc")}], "start": int(kwargs.pop("start", 0)), "length": int(kwargs.pop("length", 25)), "search": {"value": kwargs.pop("search", "")} } return json.dumps(json_data) def humanFileSize(bytes, si=False): if str(bytes).isdigit(): bytes = int(bytes) else: return bytes thresh = 1000 if si else 1024 if bytes < thresh: return str(bytes) + ' B' if si: units = ('kB', 'MB', 'GB', 'TB', 'PB', 'EB', 'ZB', 'YB') else: units = ('KiB', 'MiB', 'GiB', 'TiB', 'PiB', 'EiB', 'ZiB', 'YiB') u = -1 while bytes >= thresh and u < len(units): bytes /= thresh u += 1 return "{0:.1f} {1}".format(bytes, units[u]) def parse_condition_logic_string(s, num_cond=0): """ Parse a logic string into a nested list Based on http://stackoverflow.com/a/23185606 """ valid_tokens = re.compile(r'(\(|\)|and|or)') conditions_pattern = re.compile(r'{\d+}') tokens = [x.strip() for x in re.split(valid_tokens, s.lower()) if x.strip()] stack = [[]] cond_next = True bool_next = False open_bracket_next = True close_bracket_next = False nest_and = 0 nest_nest_and = 0 for i, x in enumerate(tokens): if open_bracket_next and x == '(': stack[-1].append([]) stack.append(stack[-1][-1]) cond_next = True bool_next = False open_bracket_next = True close_bracket_next = False if nest_and: nest_nest_and += 1 elif close_bracket_next and x == ')': stack.pop() if not stack: raise ValueError('opening bracket is missing') cond_next = False bool_next = True open_bracket_next = False close_bracket_next = True if nest_and > 0 and nest_nest_and > 0 and nest_and == nest_nest_and: stack.pop() nest_and -= 1 nest_nest_and -= 1 elif cond_next and re.match(conditions_pattern, x): try: num = int(x[1:-1]) except: raise ValueError('invalid condition logic') if not 0 < num <= num_cond: raise ValueError('invalid condition number in condition logic') stack[-1].append(num) cond_next = False bool_next = True open_bracket_next = False close_bracket_next = True if nest_and > nest_nest_and: stack.pop() nest_and -= 1 elif bool_next and x == 'and' and i < len(tokens)-1: stack[-1].append([]) stack.append(stack[-1][-1]) stack[-1].append(stack[-2].pop(-2)) stack[-1].append(x) cond_next = True bool_next = False open_bracket_next = True close_bracket_next = False nest_and += 1 elif bool_next and x == 'or' and i < len(tokens)-1: stack[-1].append(x) cond_next = True bool_next = False open_bracket_next = True close_bracket_next = False else: raise ValueError('invalid condition logic') if len(stack) > 1: raise ValueError('closing bracket is missing') return stack.pop() def nested_list_to_string(l): for i, x in enumerate(l): if isinstance(x, list): l[i] = nested_list_to_string(x) s = '(' + ' '.join(l) + ')' return s def eval_logic_groups_to_bool(logic_groups, eval_conds): first_cond = logic_groups[0] if isinstance(first_cond, list): result = eval_logic_groups_to_bool(first_cond, eval_conds) else: result = eval_conds[first_cond] for op, cond in zip(logic_groups[1::2], logic_groups[2::2]): if isinstance(cond, list): eval_cond = eval_logic_groups_to_bool(cond, eval_conds) else: eval_cond = eval_conds[cond] if op == 'and': result = result and eval_cond elif op == 'or': result = result or eval_cond return result def get_plexpy_url(hostname=None): if plexpy.CONFIG.ENABLE_HTTPS: scheme = 'https' else: scheme = 'http' if hostname is None and plexpy.CONFIG.HTTP_HOST == '0.0.0.0': import socket try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) s.connect(('', 0)) hostname = s.getsockname()[0] except socket.error: hostname = socket.gethostbyname(socket.gethostname()) if not hostname: hostname = 'localhost' else: hostname = hostname or plexpy.CONFIG.HTTP_HOST if plexpy.CONFIG.HTTP_PORT not in (80, 443): port = ':' + str(plexpy.CONFIG.HTTP_PORT) else: port = '' if plexpy.CONFIG.HTTP_ROOT.strip('/'): root = '/' + plexpy.CONFIG.HTTP_ROOT.strip('/') else: root = '' return scheme + '://' + hostname + port + root def momentjs_to_arrow(format, duration=False): invalid_formats = ['Mo', 'DDDo', 'do'] if duration: invalid_formats += ['A', 'a'] for f in invalid_formats: format = format.replace(f, '') return format