Update plexapi==4.8.0

This commit is contained in:
JonnyWong16 2021-11-28 14:17:35 -08:00
parent 36b55398a8
commit 3a50981976
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
20 changed files with 522 additions and 314 deletions

View file

@ -19,6 +19,8 @@ at <https://github.com/Ousret/charset_normalizer>.
:copyright: (c) 2021 by Ahmed TAHRI
:license: MIT, see LICENSE for more details.
"""
import logging
from .api import from_bytes, from_fp, from_path, normalize
from .legacy import (
CharsetDetector,
@ -28,6 +30,7 @@ from .legacy import (
detect,
)
from .models import CharsetMatch, CharsetMatches
from .utils import set_logging_handler
from .version import VERSION, __version__
__all__ = (
@ -44,4 +47,10 @@ __all__ = (
"CharsetDoctor",
"__version__",
"VERSION",
"set_logging_handler",
)
# Attach a NullHandler to the top level logger by default
# https://docs.python.org/3.3/howto/logging.html#configuring-logging-for-a-library
logging.getLogger("charset_normalizer").addHandler(logging.NullHandler())

View file

@ -1,3 +1,4 @@
import logging
from os.path import basename, splitext
from typing import BinaryIO, List, Optional, Set
@ -6,8 +7,6 @@ try:
except ImportError: # pragma: no cover
PathLike = str # type: ignore
import logging
from .cd import (
coherence_ratio,
encoding_languages,
@ -27,11 +26,10 @@ from .utils import (
)
logger = logging.getLogger("charset_normalizer")
logger.setLevel(logging.DEBUG)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s"))
logger.addHandler(handler)
explain_handler = logging.StreamHandler()
explain_handler.setFormatter(
logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")
)
def from_bytes(
@ -57,6 +55,9 @@ def from_bytes(
purpose.
This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32.
By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain'
toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging.
Custom logging format and handler can be set manually.
"""
if not isinstance(sequences, (bytearray, bytes)):
@ -66,10 +67,8 @@ def from_bytes(
)
)
if not explain:
logger.setLevel(logging.CRITICAL)
else:
logger.setLevel(logging.INFO)
if explain:
logger.addHandler(explain_handler)
length = len(sequences) # type: int
@ -77,6 +76,8 @@ def from_bytes(
logger.warning(
"Given content is empty, stopping the process very early, returning empty utf_8 str match"
)
if explain:
logger.removeHandler(explain_handler)
return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")])
if cp_isolation is not None:
@ -131,7 +132,7 @@ def from_bytes(
prioritized_encodings = [] # type: List[str]
specified_encoding = (
any_specified_encoding(sequences) if preemptive_behaviour is True else None
any_specified_encoding(sequences) if preemptive_behaviour else None
) # type: Optional[str]
if specified_encoding is not None:
@ -185,7 +186,7 @@ def from_bytes(
encoding_iana
) # type: bool
if encoding_iana in {"utf_16", "utf_32"} and bom_or_sig_available is False:
if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available:
logger.info(
"Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.",
encoding_iana,
@ -241,7 +242,7 @@ def from_bytes(
continue
r_ = range(
0 if bom_or_sig_available is False else len(sig_payload),
0 if not bom_or_sig_available else len(sig_payload),
length,
int(length / steps),
)
@ -261,29 +262,40 @@ def from_bytes(
max_chunk_gave_up = int(len(r_) / 4) # type: int
if max_chunk_gave_up < 2:
max_chunk_gave_up = 2
max_chunk_gave_up = max(max_chunk_gave_up, 2)
early_stop_count = 0 # type: int
md_chunks = [] # type: List[str]
md_ratios = []
for i in r_:
if i + chunk_size > length + 8:
continue
cut_sequence = sequences[i : i + chunk_size]
if bom_or_sig_available and strip_sig_or_bom is False:
cut_sequence = sig_payload + cut_sequence
chunk = cut_sequence.decode(encoding_iana, errors="ignore") # type: str
try:
chunk = cut_sequence.decode(
encoding_iana,
errors="ignore" if is_multi_byte_decoder else "strict",
) # type: str
except UnicodeDecodeError as e: # Lazy str loading may have missed something there
logger.warning(
"LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s",
encoding_iana,
str(e),
)
early_stop_count = max_chunk_gave_up
break
# multi-byte bad cutting detector and adjustment
# not the cleanest way to perform that fix but clever enough for now.
if is_multi_byte_decoder and i > 0 and sequences[i] >= 0x80:
chunk_partial_size_chk = (
16 if chunk_size > 16 else chunk_size
) # type: int
chunk_partial_size_chk = min(chunk_size, 16) # type: int
if (
decoded_payload
@ -312,11 +324,9 @@ def from_bytes(
):
break
if md_ratios:
mean_mess_ratio = sum(md_ratios) / len(md_ratios) # type: float
else:
mean_mess_ratio = 0.0
mean_mess_ratio = (
sum(md_ratios) / len(md_ratios) if md_ratios else 0.0
) # type: float
if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up:
tested_but_soft_failure.append(encoding_iana)
logger.warning(
@ -375,6 +385,20 @@ def from_bytes(
)
)
# We might want to check the sequence again with the whole content
# Only if initial MD/CD tests passes
if is_too_large_sequence and not is_multi_byte_decoder:
try:
sequences[int(50e3) :].decode(encoding_iana, errors="strict")
except UnicodeDecodeError as e:
logger.warning(
"LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s",
encoding_iana,
str(e),
)
tested_but_hard_failure.append(encoding_iana)
continue
results.append(
CharsetMatch(
sequences,
@ -393,6 +417,8 @@ def from_bytes(
logger.info(
"%s is most likely the one. Stopping the process.", encoding_iana
)
if explain:
logger.removeHandler(explain_handler)
return CharsetMatches([results[encoding_iana]])
if encoding_iana == sig_encoding:
@ -400,6 +426,8 @@ def from_bytes(
"%s is most likely the one as we detected a BOM or SIG within the beginning of the sequence.",
encoding_iana,
)
if explain:
logger.removeHandler(explain_handler)
return CharsetMatches([results[encoding_iana]])
if len(results) == 0:
@ -428,6 +456,9 @@ def from_bytes(
logger.warning("ascii will be used as a fallback match")
results.append(fallback_ascii)
if explain:
logger.removeHandler(explain_handler)
return results

View file

@ -5,7 +5,7 @@ from functools import lru_cache
from typing import Dict, List, Optional, Tuple
from .assets import FREQUENCIES
from .constant import KO_NAMES, TOO_SMALL_SEQUENCE, ZH_NAMES
from .constant import KO_NAMES, LANGUAGE_SUPPORTED_COUNT, TOO_SMALL_SEQUENCE, ZH_NAMES
from .md import is_suspiciously_successive_range
from .models import CoherenceMatches
from .utils import (
@ -110,6 +110,23 @@ def mb_encoding_languages(iana_name: str) -> List[str]:
return []
@lru_cache(maxsize=LANGUAGE_SUPPORTED_COUNT)
def get_target_features(language: str) -> Tuple[bool, bool]:
"""
Determine main aspects from a supported language if it contains accents and if is pure Latin.
"""
target_have_accents = False # type: bool
target_pure_latin = True # type: bool
for character in FREQUENCIES[language]:
if not target_have_accents and is_accentuated(character):
target_have_accents = True
if target_pure_latin and is_latin(character) is False:
target_pure_latin = False
return target_have_accents, target_pure_latin
def alphabet_languages(
characters: List[str], ignore_non_latin: bool = False
) -> List[str]:
@ -118,23 +135,11 @@ def alphabet_languages(
"""
languages = [] # type: List[Tuple[str, float]]
source_have_accents = False # type: bool
for character in characters:
if is_accentuated(character):
source_have_accents = True
break
source_have_accents = any(is_accentuated(character) for character in characters)
for language, language_characters in FREQUENCIES.items():
target_have_accents = False # type: bool
target_pure_latin = True # type: bool
for language_character in language_characters:
if target_have_accents is False and is_accentuated(language_character):
target_have_accents = True
if target_pure_latin is True and is_latin(language_character) is False:
target_pure_latin = False
target_have_accents, target_pure_latin = get_target_features(language)
if ignore_non_latin and target_pure_latin is False:
continue
@ -263,8 +268,6 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
The return type is the same as coherence_ratio.
"""
per_language_ratios = OrderedDict() # type: Dict[str, List[float]]
merge = [] # type: CoherenceMatches
for result in results:
for sub_result in result:
language, ratio = sub_result
@ -273,17 +276,16 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches:
continue
per_language_ratios[language].append(ratio)
for language in per_language_ratios:
merge.append(
merge = [
(
language,
round(
sum(per_language_ratios[language])
/ len(per_language_ratios[language]),
sum(per_language_ratios[language]) / len(per_language_ratios[language]),
4,
),
)
)
for language in per_language_ratios
]
return sorted(merge, key=lambda x: x[1], reverse=True)
@ -298,14 +300,11 @@ def coherence_ratio(
"""
results = [] # type: List[Tuple[str, float]]
lg_inclusion_list = [] # type: List[str]
ignore_non_latin = False # type: bool
sufficient_match_count = 0 # type: int
if lg_inclusion is not None:
lg_inclusion_list = lg_inclusion.split(",")
lg_inclusion_list = lg_inclusion.split(",") if lg_inclusion is not None else []
if "Latin Based" in lg_inclusion_list:
ignore_non_latin = True
lg_inclusion_list.remove("Latin Based")
@ -314,7 +313,7 @@ def coherence_ratio(
sequence_frequencies = Counter(layer) # type: Counter
most_common = sequence_frequencies.most_common()
character_count = sum([o for c, o in most_common]) # type: int
character_count = sum(o for c, o in most_common) # type: int
if character_count <= TOO_SMALL_SEQUENCE:
continue

View file

@ -235,8 +235,7 @@ def cli_detect(argv: List[str] = None) -> int:
o_.insert(-1, best_guess.encoding)
if my_file.closed is False:
my_file.close()
else:
if (
elif (
args.force is False
and query_yes_no(
'Are you sure to normalize "{}" by replacing it ?'.format(
@ -277,7 +276,7 @@ def cli_detect(argv: List[str] = None) -> int:
print(
", ".join(
[
el.encoding if el.encoding else "undefined"
el.encoding or "undefined"
for el in x_
if el.path == abspath(my_file.name)
]

View file

@ -4,6 +4,8 @@ from encodings.aliases import aliases
from re import IGNORECASE, compile as re_compile
from typing import Dict, List, Set, Union
from .assets import FREQUENCIES
# Contain for each eligible encoding a list of/item bytes SIG/BOM
ENCODING_MARKS = OrderedDict(
[
@ -30,7 +32,7 @@ TOO_BIG_SEQUENCE = int(10e6) # type: int
UTF8_MAXIMAL_ALLOCATION = 1112064 # type: int
UNICODE_RANGES_COMBINED = {
"Control character": range(0, 31 + 1),
"Control character": range(31 + 1),
"Basic Latin": range(32, 127 + 1),
"Latin-1 Supplement": range(128, 255 + 1),
"Latin Extended-A": range(256, 383 + 1),
@ -311,6 +313,7 @@ UNICODE_RANGES_COMBINED = {
"Variation Selectors Supplement": range(917760, 917999 + 1),
} # type: Dict[str, range]
UNICODE_SECONDARY_RANGE_KEYWORD = [
"Supplement",
"Extended",
@ -352,11 +355,10 @@ IANA_SUPPORTED_SIMILAR = {
"cp1140": ["cp037", "cp1026", "cp273", "cp500"],
"cp1250": ["iso8859_2"],
"cp1251": ["kz1048", "ptcp154"],
"cp1252": ["cp1258", "iso8859_15", "iso8859_9", "latin_1"],
"cp1252": ["iso8859_15", "iso8859_9", "latin_1"],
"cp1253": ["iso8859_7"],
"cp1254": ["cp1258", "iso8859_15", "iso8859_9", "latin_1"],
"cp1254": ["iso8859_15", "iso8859_9", "latin_1"],
"cp1257": ["iso8859_13"],
"cp1258": ["cp1252", "cp1254", "iso8859_9", "latin_1"],
"cp273": ["cp037", "cp1026", "cp1140", "cp500"],
"cp437": ["cp850", "cp858", "cp860", "cp861", "cp862", "cp863", "cp865"],
"cp500": ["cp037", "cp1026", "cp1140", "cp273"],
@ -494,3 +496,5 @@ KO_NAMES = {"johab", "cp949", "euc_kr"} # type: Set[str]
ZH_NAMES = {"big5", "cp950", "big5hkscs", "hz"} # type: Set[str]
NOT_PRINTABLE_PATTERN = re_compile(r"[0-9\W\n\r\t]+")
LANGUAGE_SUPPORTED_COUNT = len(FREQUENCIES) # type: int

View file

@ -40,11 +40,11 @@ class MessDetectorPlugin:
"""
raise NotImplementedError # pragma: nocover
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
"""
Permit to reset the plugin to the initial state.
"""
raise NotImplementedError # pragma: nocover
raise NotImplementedError
@property
def ratio(self) -> float:
@ -85,7 +85,7 @@ class TooManySymbolOrPunctuationPlugin(MessDetectorPlugin):
self._last_printable_char = character
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._punctuation_count = 0
self._character_count = 0
self._symbol_count = 0
@ -116,7 +116,7 @@ class TooManyAccentuatedPlugin(MessDetectorPlugin):
if is_accentuated(character):
self._accentuated_count += 1
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._character_count = 0
self._accentuated_count = 0
@ -147,7 +147,7 @@ class UnprintablePlugin(MessDetectorPlugin):
self._unprintable_count += 1
self._character_count += 1
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._unprintable_count = 0
@property
@ -170,18 +170,19 @@ class SuspiciousDuplicateAccentPlugin(MessDetectorPlugin):
def feed(self, character: str) -> None:
self._character_count += 1
if self._last_latin_character is not None:
if is_accentuated(character) and is_accentuated(self._last_latin_character):
if (
self._last_latin_character is not None
and is_accentuated(character)
and is_accentuated(self._last_latin_character)
):
if character.isupper() and self._last_latin_character.isupper():
self._successive_count += 1
# Worse if its the same char duplicated with different accent.
if remove_accent(character) == remove_accent(
self._last_latin_character
):
if remove_accent(character) == remove_accent(self._last_latin_character):
self._successive_count += 1
self._last_latin_character = character
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._successive_count = 0
self._character_count = 0
self._last_latin_character = None
@ -228,7 +229,7 @@ class SuspiciousRange(MessDetectorPlugin):
self._last_printable_seen = character
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._character_count = 0
self._suspicious_successive_range_count = 0
self._last_printable_seen = None
@ -252,6 +253,8 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
def __init__(self) -> None:
self._word_count = 0 # type: int
self._bad_word_count = 0 # type: int
self._foreign_long_count = 0 # type: int
self._is_current_word_bad = False # type: bool
self._foreign_long_watch = False # type: bool
@ -271,7 +274,7 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
self._buffer_accent_count += 1
if (
self._foreign_long_watch is False
and is_latin(character) is False
and (is_latin(character) is False or is_accentuated(character))
and is_cjk(character) is False
and is_hangul(character) is False
and is_katakana(character) is False
@ -290,9 +293,16 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
self._character_count += buffer_length
if buffer_length >= 4 and self._buffer_accent_count / buffer_length > 0.34:
if buffer_length >= 4:
if self._buffer_accent_count / buffer_length > 0.34:
self._is_current_word_bad = True
# Word/Buffer ending with a upper case accentuated letter are so rare,
# that we will consider them all as suspicious. Same weight as foreign_long suspicious.
if is_accentuated(self._buffer[-1]) and self._buffer[-1].isupper():
self._foreign_long_count += 1
self._is_current_word_bad = True
if buffer_length >= 24 and self._foreign_long_watch:
self._foreign_long_count += 1
self._is_current_word_bad = True
if self._is_current_word_bad:
@ -311,7 +321,7 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
self._is_current_word_bad = True
self._buffer += character
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._buffer = ""
self._is_current_word_bad = False
self._foreign_long_watch = False
@ -319,10 +329,11 @@ class SuperWeirdWordPlugin(MessDetectorPlugin):
self._word_count = 0
self._character_count = 0
self._bad_character_count = 0
self._foreign_long_count = 0
@property
def ratio(self) -> float:
if self._word_count <= 10:
if self._word_count <= 10 and self._foreign_long_count == 0:
return 0.0
return self._bad_character_count / self._character_count
@ -342,13 +353,13 @@ class CjkInvalidStopPlugin(MessDetectorPlugin):
return True
def feed(self, character: str) -> None:
if character in ["", ""]:
if character in {"", ""}:
self._wrong_stop_count += 1
return
if is_cjk(character):
self._cjk_character_count += 1
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._wrong_stop_count = 0
self._cjk_character_count = 0
@ -418,7 +429,7 @@ class ArchaicUpperLowerPlugin(MessDetectorPlugin):
self._character_count_since_last_sep += 1
self._last_alpha_seen = character
def reset(self) -> None:
def reset(self) -> None: # pragma: no cover
self._character_count = 0
self._character_count_since_last_sep = 0
self._successive_upper_lower_count = 0
@ -453,6 +464,13 @@ def is_suspiciously_successive_range(
if "Emoticons" in unicode_range_a or "Emoticons" in unicode_range_b:
return False
# Latin characters can be accompanied with a combining diacritical mark
# eg. Vietnamese.
if ("Latin" in unicode_range_a or "Latin" in unicode_range_b) and (
"Combining" in unicode_range_a or "Combining" in unicode_range_b
):
return False
keywords_range_a, keywords_range_b = unicode_range_a.split(
" "
), unicode_range_b.split(" ")
@ -472,8 +490,9 @@ def is_suspiciously_successive_range(
),
unicode_range_b in ("Hiragana", "Katakana"),
)
if range_a_jp_chars or range_b_jp_chars:
if "CJK" in unicode_range_a or "CJK" in unicode_range_b:
if (range_a_jp_chars or range_b_jp_chars) and (
"CJK" in unicode_range_a or "CJK" in unicode_range_b
):
return False
if range_a_jp_chars and range_b_jp_chars:
return False
@ -509,7 +528,7 @@ def mess_ratio(
md_class() for md_class in MessDetectorPlugin.__subclasses__()
] # type: List[MessDetectorPlugin]
length = len(decoded_sequence) # type: int
length = len(decoded_sequence) + 1 # type: int
mean_mess_ratio = 0.0 # type: float
@ -520,7 +539,7 @@ def mess_ratio(
else:
intermediary_mean_mess_ratio_calc = 128
for character, index in zip(decoded_sequence, range(0, length)):
for character, index in zip(decoded_sequence + "\n", range(length)):
for detector in detectors:
if detector.eligible(character):
detector.feed(character)
@ -528,7 +547,7 @@ def mess_ratio(
if (
index > 0 and index % intermediary_mean_mess_ratio_calc == 0
) or index == length - 1:
mean_mess_ratio = sum([dt.ratio for dt in detectors])
mean_mess_ratio = sum(dt.ratio for dt in detectors)
if mean_mess_ratio >= maximum_threshold:
break

View file

@ -284,8 +284,7 @@ class CharsetMatches:
self._results = sorted(results) if results else [] # type: List[CharsetMatch]
def __iter__(self) -> Iterator[CharsetMatch]:
for result in self._results:
yield result
yield from self._results
def __getitem__(self, item: Union[int, str]) -> CharsetMatch:
"""

View file

@ -4,6 +4,7 @@ except ImportError:
import unicodedata # type: ignore[no-redef]
import importlib
import logging
from codecs import IncrementalDecoder
from encodings.aliases import aliases
from functools import lru_cache
@ -122,7 +123,7 @@ def is_emoticon(character: str) -> bool:
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
def is_separator(character: str) -> bool:
if character.isspace() or character in ["", "+", ",", ";", "<", ">"]:
if character.isspace() or character in {"", "+", ",", ";", "<", ">"}:
return True
character_category = unicodedata.category(character) # type: str
@ -138,7 +139,7 @@ def is_case_variable(character: str) -> bool:
def is_private_use_only(character: str) -> bool:
character_category = unicodedata.category(character) # type: str
return "Co" == character_category
return character_category == "Co"
@lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION)
@ -193,11 +194,7 @@ def is_thai(character: str) -> bool:
@lru_cache(maxsize=len(UNICODE_RANGES_COMBINED))
def is_unicode_range_secondary(range_name: str) -> bool:
for keyword in UNICODE_SECONDARY_RANGE_KEYWORD:
if keyword in range_name:
return True
return False
return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD)
def any_specified_encoding(sequence: bytes, search_zone: int = 4096) -> Optional[str]:
@ -211,9 +208,7 @@ def any_specified_encoding(sequence: bytes, search_zone: int = 4096) -> Optional
results = findall(
RE_POSSIBLE_ENCODING_INDICATION,
sequence[: seq_len if seq_len <= search_zone else search_zone].decode(
"ascii", errors="ignore"
),
sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"),
) # type: List[str]
if len(results) == 0:
@ -278,7 +273,7 @@ def iana_name(cp_name: str, strict: bool = True) -> str:
cp_name = cp_name.lower().replace("-", "_")
for encoding_alias, encoding_iana in aliases.items():
if cp_name == encoding_alias or cp_name == encoding_iana:
if cp_name in [encoding_alias, encoding_iana]:
return encoding_iana
if strict:
@ -314,7 +309,7 @@ def cp_similarity(iana_name_a: str, iana_name_b: str) -> float:
character_match_count = 0 # type: int
for i in range(0, 255):
for i in range(255):
to_be_decoded = bytes([i]) # type: bytes
if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded):
character_match_count += 1
@ -331,3 +326,17 @@ def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool:
iana_name_a in IANA_SUPPORTED_SIMILAR
and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a]
)
def set_logging_handler(
name: str = "charset_normalizer",
level: int = logging.INFO,
format_string: str = "%(asctime)s | %(levelname)s | %(message)s",
) -> None:
logger = logging.getLogger(name)
logger.setLevel(level)
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter(format_string))
logger.addHandler(handler)

View file

@ -2,5 +2,5 @@
Expose version
"""
__version__ = "2.0.7"
__version__ = "2.0.8"
VERSION = __version__.split(".")

View file

@ -29,14 +29,18 @@ class AlertListener(threading.Thread):
callback (func): Callback function to call on received messages. The callback function
will be sent a single argument 'data' which will contain a dictionary of data
received from the server. :samp:`def my_callback(data): ...`
callbackError (func): Callback function to call on errors. The callback function
will be sent a single argument 'error' which will contain the Error object.
:samp:`def my_callback(error): ...`
"""
key = '/:/websockets/notifications'
def __init__(self, server, callback=None):
def __init__(self, server, callback=None, callbackError=None):
super(AlertListener, self).__init__()
self.daemon = True
self._server = server
self._callback = callback
self._callbackError = callbackError
self._ws = None
def run(self):
@ -84,4 +88,9 @@ class AlertListener(threading.Thread):
This is to support compatibility with current and previous releases of websocket-client.
"""
err = args[-1]
try:
log.error('AlertListener Error: %s', err)
if self._callbackError:
self._callbackError(err)
except Exception as err: # pragma: no cover
log.error('AlertListener Error: Error: %s', err)

View file

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import os
from urllib.parse import quote_plus
from plexapi import library, media, utils
@ -205,23 +206,20 @@ class Artist(Audio, AdvancedSettingsMixin, ArtMixin, PosterMixin, RatingMixin, S
""" Alias of :func:`~plexapi.audio.Artist.track`. """
return self.track(title, album, track)
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Downloads all tracks for the artist to the specified location.
def download(self, savepath=None, keep_original_name=False, subfolders=False, **kwargs):
""" Download all tracks from the artist. See :func:`~plexapi.base.Playable.download` for details.
Parameters:
savepath (str): Title of the track to return.
keep_original_name (bool): Set True to keep the original filename as stored in
the Plex server. False will create a new filename with the format
"<Atrist> - <Album> <Track>".
kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will
be returned and the additional arguments passed in will be sent to that
function. If kwargs is not specified, the media items will be downloaded
and saved to disk.
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated.
subfolders (bool): True to separate tracks in to album folders.
**kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`.
"""
filepaths = []
for album in self.albums():
for track in album.tracks():
filepaths += track.download(savepath, keep_original_name, **kwargs)
for track in self.tracks():
_savepath = os.path.join(savepath, track.parentTitle) if subfolders else savepath
filepaths += track.download(_savepath, keep_original_name, **kwargs)
return filepaths
@ -314,17 +312,13 @@ class Album(Audio, ArtMixin, PosterMixin, RatingMixin, UnmatchMatchMixin,
return self.fetchItem(self.parentKey)
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Downloads all tracks for the artist to the specified location.
""" Download all tracks from the album. See :func:`~plexapi.base.Playable.download` for details.
Parameters:
savepath (str): Title of the track to return.
keep_original_name (bool): Set True to keep the original filename as stored in
the Plex server. False will create a new filename with the format
"<Atrist> - <Album> <Track>".
kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will
be returned and the additional arguments passed in will be sent to that
function. If kwargs is not specified, the media items will be downloaded
and saved to disk.
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated.
**kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`.
"""
filepaths = []
for track in self.tracks():
@ -398,7 +392,8 @@ class Track(Audio, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixin,
def _prettyfilename(self):
""" Returns a filename for use in download. """
return '%s - %s %s' % (self.grandparentTitle, self.parentTitle, self.title)
return '%s - %s - %s - %s' % (
self.grandparentTitle, self.parentTitle, str(self.trackNumber).zfill(2), self.title)
def album(self):
""" Return the track's :class:`~plexapi.audio.Album`. """

View file

@ -681,34 +681,50 @@ class Playable(object):
client.playMedia(self)
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Downloads this items media to the specified location. Returns a list of
""" Downloads the media item to the specified location. Returns a list of
filepaths that have been saved to disk.
Parameters:
savepath (str): Title of the track to return.
keep_original_name (bool): Set True to keep the original filename as stored in
the Plex server. False will create a new filename with the format
"<Artist> - <Album> <Track>".
kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will
be returned and the additional arguments passed in will be sent to that
function. If kwargs is not specified, the media items will be downloaded
and saved to disk.
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated. See filenames below.
**kwargs (dict): Additional options passed into :func:`~plexapi.audio.Track.getStreamURL`
to download a transcoded stream, otherwise the media item will be downloaded
as-is and saved to disk.
**Filenames**
* Movie: ``<title> (<year>)``
* Episode: ``<show title> - s00e00 - <episode title>``
* Track: ``<artist title> - <album title> - 00 - <track title>``
* Photo: ``<photoalbum title> - <photo/clip title>`` or ``<photo/clip title>``
"""
filepaths = []
locations = [i for i in self.iterParts() if i]
for location in locations:
filename = location.file
if keep_original_name is False:
filename = '%s.%s' % (self._prettyfilename(), location.container)
# So this seems to be a alot slower but allows transcode.
parts = [i for i in self.iterParts() if i]
for part in parts:
if not keep_original_name:
filename = utils.cleanFilename('%s.%s' % (self._prettyfilename(), part.container))
else:
filename = part.file
if kwargs:
# So this seems to be a alot slower but allows transcode.
download_url = self.getStreamURL(**kwargs)
else:
download_url = self._server.url('%s?download=1' % location.key)
filepath = utils.download(download_url, self._server._token, filename=filename,
savepath=savepath, session=self._server._session)
download_url = self._server.url('%s?download=1' % part.key)
filepath = utils.download(
download_url,
self._server._token,
filename=filename,
savepath=savepath,
session=self._server._session
)
if filepath:
filepaths.append(filepath)
return filepaths
def stop(self, reason=''):

View file

@ -3,7 +3,7 @@
# Library version
MAJOR_VERSION = 4
MINOR_VERSION = 7
PATCH_VERSION = 2
MINOR_VERSION = 8
PATCH_VERSION = 0
__short_version__ = f"{MAJOR_VERSION}.{MINOR_VERSION}"
__version__ = f"{__short_version__}.{PATCH_VERSION}"

View file

@ -26,47 +26,61 @@ class Library(PlexObject):
def _loadData(self, data):
self._data = data
self._sectionsByID = {} # cached Section UUIDs
self.identifier = data.attrib.get('identifier')
self.mediaTagVersion = data.attrib.get('mediaTagVersion')
self.title1 = data.attrib.get('title1')
self.title2 = data.attrib.get('title2')
self._sectionsByID = {} # cached sections by key
self._sectionsByTitle = {} # cached sections by title
def _loadSections(self):
""" Loads and caches all the library sections. """
key = '/library/sections'
self._sectionsByID = {}
self._sectionsByTitle = {}
for elem in self._server.query(key):
for cls in (MovieSection, ShowSection, MusicSection, PhotoSection):
if elem.attrib.get('type') == cls.TYPE:
section = cls(self._server, elem, key)
self._sectionsByID[section.key] = section
self._sectionsByTitle[section.title.lower()] = section
def sections(self):
""" Returns a list of all media sections in this library. Library sections may be any of
:class:`~plexapi.library.MovieSection`, :class:`~plexapi.library.ShowSection`,
:class:`~plexapi.library.MusicSection`, :class:`~plexapi.library.PhotoSection`.
"""
key = '/library/sections'
sections = []
for elem in self._server.query(key):
for cls in (MovieSection, ShowSection, MusicSection, PhotoSection):
if elem.attrib.get('type') == cls.TYPE:
section = cls(self._server, elem, key)
self._sectionsByID[section.key] = section
sections.append(section)
return sections
self._loadSections()
return list(self._sectionsByID.values())
def section(self, title=None):
def section(self, title):
""" Returns the :class:`~plexapi.library.LibrarySection` that matches the specified title.
Parameters:
title (str): Title of the section to return.
"""
for section in self.sections():
if section.title.lower() == title.lower():
return section
raise NotFound('Invalid library section: %s' % title)
if not self._sectionsByTitle or title not in self._sectionsByTitle:
self._loadSections()
try:
return self._sectionsByTitle[title.lower()]
except KeyError:
raise NotFound('Invalid library section: %s' % title) from None
def sectionByID(self, sectionID):
""" Returns the :class:`~plexapi.library.LibrarySection` that matches the specified sectionID.
Parameters:
sectionID (int): ID of the section to return.
Raises:
:exc:`~plexapi.exceptions.NotFound`: The library section ID is not found on the server.
"""
if not self._sectionsByID or sectionID not in self._sectionsByID:
self.sections()
self._loadSections()
try:
return self._sectionsByID[sectionID]
except KeyError:
raise NotFound('Invalid library sectionID: %s' % sectionID) from None
def all(self, **kwargs):
""" Returns a list of all media from all library sections.
@ -356,6 +370,9 @@ class LibrarySection(PlexObject):
self._filterTypes = None
self._fieldTypes = None
self._totalViewSize = None
self._totalSize = None
self._totalDuration = None
self._totalStorage = None
def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, **kwargs):
""" Load the specified key to find and build all items with the specified tag
@ -394,7 +411,36 @@ class LibrarySection(PlexObject):
@property
def totalSize(self):
""" Returns the total number of items in the library for the default library type. """
return self.totalViewSize(includeCollections=False)
if self._totalSize is None:
self._totalSize = self.totalViewSize(includeCollections=False)
return self._totalSize
@property
def totalDuration(self):
""" Returns the total duration (in milliseconds) of items in the library. """
if self._totalDuration is None:
self._getTotalDurationStorage()
return self._totalDuration
@property
def totalStorage(self):
""" Returns the total storage (in bytes) of items in the library. """
if self._totalStorage is None:
self._getTotalDurationStorage()
return self._totalStorage
def _getTotalDurationStorage(self):
""" Queries the Plex server for the total library duration and storage and caches the values. """
data = self._server.query('/media/providers?includeStorage=1')
xpath = (
'./MediaProvider[@identifier="com.plexapp.plugins.library"]'
'/Feature[@type="content"]'
'/Directory[@id="%s"]'
) % self.key
directory = next(iter(data.findall(xpath)), None)
if directory:
self._totalDuration = utils.cast(int, directory.attrib.get('durationTotal'))
self._totalStorage = utils.cast(int, directory.attrib.get('storageTotal'))
def totalViewSize(self, libtype=None, includeCollections=True):
""" Returns the total number of items in the library for a specified libtype.
@ -432,8 +478,12 @@ class LibrarySection(PlexObject):
log.error(msg)
raise
def reload(self, key=None):
return self._server.library.section(self.title)
def reload(self):
""" Reload the data for the library section. """
self._server.library._loadSections()
newLibrary = self._server.library.sectionByID(self.key)
self.__dict__.update(newLibrary.__dict__)
return self
def edit(self, agent=None, **kwargs):
""" Edit a library (Note: agent is required). See :class:`~plexapi.library.Library` for example usage.
@ -446,11 +496,6 @@ class LibrarySection(PlexObject):
part = '/library/sections/%s?agent=%s&%s' % (self.key, agent, urlencode(kwargs))
self._server.query(part, method=self._server._session.put)
# Reload this way since the self.key dont have a full path, but is simply a id.
for s in self._server.library.sections():
if s.key == self.key:
return s
def get(self, title):
""" Returns the media item with the specified title.

View file

@ -79,13 +79,16 @@ class Media(PlexObject):
self.make = data.attrib.get('make')
self.model = data.attrib.get('model')
parent = self._parent()
self._parentKey = parent.key
@property
def isOptimizedVersion(self):
""" Returns True if the media is a Plex optimized version. """
return self.proxyType == utils.SEARCHTYPES['optimizedVersion']
def delete(self):
part = self._initpath + '/media/%s' % self.id
part = '%s/media/%s' % (self._parentKey, self.id)
try:
return self._server.query(part, method=self._server._session.delete)
except BadRequest:

View file

@ -70,9 +70,6 @@ class MyPlexAccount(PlexObject):
PLEXSERVERS = 'https://plex.tv/api/servers/{machineId}' # get
FRIENDUPDATE = 'https://plex.tv/api/friends/{userId}' # put with args, delete
REMOVEHOMEUSER = 'https://plex.tv/api/home/users/{userId}' # delete
REMOVEINVITE = 'https://plex.tv/api/invites/requested/{userId}?friend=1&server=1&home=1' # delete
REQUESTED = 'https://plex.tv/api/invites/requested' # get
REQUESTS = 'https://plex.tv/api/invites/requests' # get
SIGNIN = 'https://plex.tv/users/sign_in.xml' # get with auth
WEBHOOKS = 'https://plex.tv/api/v2/user/webhooks' # get, post with data
OPTOUTS = 'https://plex.tv/api/v2/user/%(userUUID)s/settings/opt_outs' # get
@ -365,26 +362,55 @@ class MyPlexAccount(PlexObject):
return self.query(url, self._session.post, headers=headers)
def removeFriend(self, user):
""" Remove the specified user from all sharing.
""" Remove the specified user from your friends.
Parameters:
user (str): MyPlexUser, username, email of the user to be added.
user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed.
"""
user = self.user(user)
url = self.FRIENDUPDATE if user.friend else self.REMOVEINVITE
url = url.format(userId=user.id)
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.FRIENDUPDATE.format(userId=user.id)
return self.query(url, self._session.delete)
def removeHomeUser(self, user):
""" Remove the specified managed user from home.
""" Remove the specified user from your home users.
Parameters:
user (str): MyPlexUser, username, email of the user to be removed from home.
user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed.
"""
user = self.user(user)
user = user if isinstance(user, MyPlexUser) else self.user(user)
url = self.REMOVEHOMEUSER.format(userId=user.id)
return self.query(url, self._session.delete)
def acceptInvite(self, user):
""" Accept a pending firend invite from the specified user.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to accept.
"""
invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeSent=False)
params = {
'friend': int(invite.friend),
'home': int(invite.home),
'server': int(invite.server)
}
url = MyPlexInvite.REQUESTS + '/%s' % invite.id + utils.joinArgs(params)
return self.query(url, self._session.put)
def cancelInvite(self, user):
""" Cancel a pending firend invite for the specified user.
Parameters:
user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to cancel.
"""
invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeReceived=False)
params = {
'friend': int(invite.friend),
'home': int(invite.home),
'server': int(invite.server)
}
url = MyPlexInvite.REQUESTED + '/%s' % invite.id + utils.joinArgs(params)
return self.query(url, self._session.delete)
def updateFriend(self, user, server, sections=None, removeSections=False, allowSync=None, allowCameraUpload=None,
allowChannels=None, filterMovies=None, filterTelevision=None, filterMusic=None):
""" Update the specified user's share settings.
@ -455,7 +481,7 @@ class MyPlexAccount(PlexObject):
return response_servers, response_filters
def user(self, username):
""" Returns the :class:`~plexapi.myplex.MyPlexUser` that matches the email or username specified.
""" Returns the :class:`~plexapi.myplex.MyPlexUser` that matches the specified username or email.
Parameters:
username (str): Username, email or id of the user to return.
@ -474,12 +500,43 @@ class MyPlexAccount(PlexObject):
def users(self):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexUser` objects connected to your account.
This includes both friends and pending invites. You can reference the user.friend to
distinguish between the two.
"""
friends = [MyPlexUser(self, elem) for elem in self.query(MyPlexUser.key)]
requested = [MyPlexUser(self, elem, self.REQUESTED) for elem in self.query(self.REQUESTED)]
return friends + requested
elem = self.query(MyPlexUser.key)
return self.findItems(elem, cls=MyPlexUser)
def pendingInvite(self, username, includeSent=True, includeReceived=True):
""" Returns the :class:`~plexapi.myplex.MyPlexInvite` that matches the specified username or email.
Note: This can be a pending invite sent from your account or received to your account.
Parameters:
username (str): Username, email or id of the user to return.
includeSent (bool): True to include sent invites.
includeReceived (bool): True to include received invites.
"""
username = str(username)
for invite in self.pendingInvites(includeSent, includeReceived):
if (invite.username and invite.email and invite.id and username.lower() in
(invite.username.lower(), invite.email.lower(), str(invite.id))):
return invite
raise NotFound('Unable to find invite %s' % username)
def pendingInvites(self, includeSent=True, includeReceived=True):
""" Returns a list of all :class:`~plexapi.myplex.MyPlexInvite` objects connected to your account.
Note: This includes all pending invites sent from your account and received to your account.
Parameters:
includeSent (bool): True to include sent invites.
includeReceived (bool): True to include received invites.
"""
invites = []
if includeSent:
elem = self.query(MyPlexInvite.REQUESTED)
invites += self.findItems(elem, cls=MyPlexInvite)
if includeReceived:
elem = self.query(MyPlexInvite.REQUESTS)
invites += self.findItems(elem, cls=MyPlexInvite)
return invites
def _getSectionIds(self, server, sections):
""" Converts a list of section objects or names to sectionIds needed for library sharing. """
@ -731,10 +788,10 @@ class MyPlexUser(PlexObject):
protected (False): Unknown (possibly SSL enabled?).
recommendationsPlaylistId (str): Unknown.
restricted (str): Unknown.
servers (List<:class:`~plexapi.myplex.<MyPlexServerShare`>)): Servers shared with the user.
thumb (str): Link to the users avatar.
title (str): Seems to be an aliad for username.
username (str): User's username.
servers: Servers shared between user and friend
"""
TAG = 'User'
key = 'https://plex.tv/api/users/'
@ -796,6 +853,43 @@ class MyPlexUser(PlexObject):
return hist
class MyPlexInvite(PlexObject):
""" This object represents pending friend invites.
Attributes:
TAG (str): 'Invite'
createdAt (datetime): Datetime the user was invited.
email (str): User's email address (user@gmail.com).
friend (bool): True or False if the user is invited as a friend.
friendlyName (str): The user's friendly name.
home (bool): True or False if the user is invited to a Plex Home.
id (int): User's Plex account ID.
server (bool): True or False if the user is invited to any servers.
servers (List<:class:`~plexapi.myplex.<MyPlexServerShare`>)): Servers shared with the user.
thumb (str): Link to the users avatar.
username (str): User's username.
"""
TAG = 'Invite'
REQUESTS = 'https://plex.tv/api/invites/requests'
REQUESTED = 'https://plex.tv/api/invites/requested'
def _loadData(self, data):
""" Load attribute values from Plex XML response. """
self._data = data
self.createdAt = utils.toDatetime(data.attrib.get('createdAt'))
self.email = data.attrib.get('email')
self.friend = utils.cast(bool, data.attrib.get('friend'))
self.friendlyName = data.attrib.get('friendlyName')
self.home = utils.cast(bool, data.attrib.get('home'))
self.id = utils.cast(int, data.attrib.get('id'))
self.server = utils.cast(bool, data.attrib.get('server'))
self.servers = self.findItems(data, MyPlexServerShare)
self.thumb = data.attrib.get('thumb')
self.username = data.attrib.get('username', '')
for server in self.servers:
server.accountID = self.id
class Section(PlexObject):
""" This refers to a shared section. The raw xml for the data presented here
can be found at: https://plex.tv/api/servers/{machineId}/shared_servers

View file

@ -1,4 +1,5 @@
# -*- coding: utf-8 -*-
import os
from urllib.parse import quote_plus
from plexapi import media, utils, video
@ -107,34 +108,21 @@ class Photoalbum(PlexPartialObject, ArtMixin, PosterMixin, RatingMixin):
""" Alias to :func:`~plexapi.photo.Photoalbum.photo`. """
return self.episode(title)
def iterParts(self):
""" Iterates over the parts of the media item. """
for album in self.albums():
for photo in album.photos():
for part in photo.iterParts():
yield part
def download(self, savepath=None, keep_original_name=False, showstatus=False):
""" Download photo files to specified directory.
def download(self, savepath=None, keep_original_name=False, subfolders=False):
""" Download all photos and clips from the photo ablum. See :func:`~plexapi.base.Playable.download` for details.
Parameters:
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original file name otherwise
a friendlier is generated.
showstatus(bool): Display a progressbar.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated.
subfolders (bool): True to separate photos/clips in to photo album folders.
"""
filepaths = []
locations = [i for i in self.iterParts() if i]
for location in locations:
name = location.file
if not keep_original_name:
title = self.title.replace(' ', '.')
name = '%s.%s' % (title, location.container)
url = self._server.url('%s?download=1' % location.key)
filepath = utils.download(url, self._server._token, filename=name, showstatus=showstatus,
savepath=savepath, session=self._server._session)
if filepath:
filepaths.append(filepath)
for album in self.albums():
_savepath = os.path.join(savepath, album.title) if subfolders else savepath
filepaths += album.download(_savepath, keep_original_name)
for photo in self.photos() + self.clips():
filepaths += photo.download(savepath, keep_original_name)
return filepaths
def _getWebURL(self, base=None):
@ -218,6 +206,12 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi
self.userRating = utils.cast(float, data.attrib.get('userRating'))
self.year = utils.cast(int, data.attrib.get('year'))
def _prettyfilename(self):
""" Returns a filename for use in download. """
if self.parentTitle:
return '%s - %s' % (self.parentTitle, self.title)
return self.title
def photoalbum(self):
""" Return the photo's :class:`~plexapi.photo.Photoalbum`. """
return self.fetchItem(self.parentKey)
@ -241,12 +235,6 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi
"""
return [part.file for item in self.media for part in item.parts if part]
def iterParts(self):
""" Iterates over the parts of the media item. """
for item in self.media:
for part in item.parts:
yield part
def sync(self, resolution, client=None, clientId=None, limit=None, title=None):
""" Add current photo as sync item for specified device.
See :func:`~plexapi.myplex.MyPlexAccount.sync` for possible exceptions.
@ -283,29 +271,6 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi
return myplex.sync(sync_item, client=client, clientId=clientId)
def download(self, savepath=None, keep_original_name=False, showstatus=False):
""" Download photo files to specified directory.
Parameters:
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original file name otherwise
a friendlier is generated.
showstatus(bool): Display a progressbar.
"""
filepaths = []
locations = [i for i in self.iterParts() if i]
for location in locations:
name = location.file
if not keep_original_name:
title = self.title.replace(' ', '.')
name = '%s.%s' % (title, location.container)
url = self._server.url('%s?download=1' % location.key)
filepath = utils.download(url, self._server._token, filename=name, showstatus=showstatus,
savepath=savepath, session=self._server._session)
if filepath:
filepaths.append(filepath)
return filepaths
def _getWebURL(self, base=None):
""" Get the Plex Web URL with the correct parameters. """
return self._server._buildWebURL(base=base, endpoint='details', key=self.parentKey, legacy=1)

View file

@ -734,21 +734,46 @@ class PlexServer(PlexObject):
notifier.start()
return notifier
def transcodeImage(self, media, height, width, opacity=100, saturation=100):
""" Returns the URL for a transcoded image from the specified media object.
Returns None if no media specified (needed if user tries to pass thumb
or art directly).
def transcodeImage(self, imageUrl, height, width,
opacity=None, saturation=None, blur=None, background=None,
minSize=True, upscale=True, imageFormat=None):
""" Returns the URL for a transcoded image.
Parameters:
imageUrl (str): The URL to the image
(eg. returned by :func:`~plexapi.mixins.PosterUrlMixin.thumbUrl`
or :func:`~plexapi.mixins.ArtUrlMixin.artUrl`).
The URL can be an online image.
height (int): Height to transcode the image to.
width (int): Width to transcode the image to.
opacity (int): Opacity of the resulting image (possibly deprecated).
saturation (int): Saturating of the resulting image.
opacity (int, optional): Change the opacity of the image (0 to 100)
saturation (int, optional): Change the saturation of the image (0 to 100).
blur (int, optional): The blur to apply to the image in pixels (e.g. 3).
background (str, optional): The background hex colour to apply behind the opacity (e.g. '000000').
minSize (bool, optional): Maintain smallest dimension. Default True.
upscale (bool, optional): Upscale the image if required. Default True.
imageFormat (str, optional): 'jpeg' (default) or 'png'.
"""
if media:
transcode_url = '/photo/:/transcode?height=%s&width=%s&opacity=%s&saturation=%s&url=%s' % (
height, width, opacity, saturation, media)
return self.url(transcode_url, includeToken=True)
params = {
'url': imageUrl,
'height': height,
'width': width,
'minSize': int(bool(minSize)),
'upscale': int(bool(upscale))
}
if opacity is not None:
params['opacity'] = opacity
if saturation is not None:
params['saturation'] = saturation
if blur is not None:
params['blur'] = blur
if background is not None:
params['background'] = str(background).strip('#')
if imageFormat is not None:
params['format'] = imageFormat.lower()
key = '/photo/:/transcode%s' % utils.joinArgs(params)
return self.url(key, includeToken=True)
def url(self, key, includeToken=None):
""" Build a URL string with proper token argument. Token will be appended to the URL

View file

@ -4,7 +4,9 @@ import functools
import logging
import os
import re
import string
import time
import unicodedata
import warnings
import zipfile
from datetime import datetime
@ -251,6 +253,13 @@ def toList(value, itemcast=None, delim=','):
return [itemcast(item) for item in value.split(delim) if item != '']
def cleanFilename(filename, replace='_'):
whitelist = "-_.()[] {}{}".format(string.ascii_letters, string.digits)
cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode()
cleaned_filename = ''.join(c if c in whitelist else replace for c in cleaned_filename)
return cleaned_filename
def downloadSessionImages(server, filename=None, height=150, width=150,
opacity=100, saturation=100): # pragma: no cover
""" Helper to download a bif image or thumb.url from plex.server.sessions.

View file

@ -357,8 +357,8 @@ class Movie(Video, Playable, AdvancedSettingsMixin, ArtMixin, PosterMixin, Ratin
return any(part.hasPreviewThumbnails for media in self.media for part in media.parts)
def _prettyfilename(self):
# This is just for compat.
return self.title
""" Returns a filename for use in download. """
return '%s (%s)' % (self.title, self.year)
def reviews(self):
""" Returns a list of :class:`~plexapi.media.Review` objects. """
@ -375,32 +375,6 @@ class Movie(Video, Playable, AdvancedSettingsMixin, ArtMixin, PosterMixin, Ratin
data = self._server.query(self._details_key)
return self.findItems(data, library.Hub, rtag='Related')
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Download video files to specified directory.
Parameters:
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original file name otherwise
a friendlier is generated.
**kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`.
"""
filepaths = []
locations = [i for i in self.iterParts() if i]
for location in locations:
name = location.file
if not keep_original_name:
title = self.title.replace(' ', '.')
name = '%s.%s' % (title, location.container)
if kwargs is not None:
url = self.getStreamURL(**kwargs)
else:
self._server.url('%s?download=1' % location.key)
filepath = utils.download(url, self._server._token, filename=name,
savepath=savepath, session=self._server._session)
if filepath:
filepaths.append(filepath)
return filepaths
@utils.registerPlexObject
class Show(Video, AdvancedSettingsMixin, ArtMixin, BannerMixin, PosterMixin, RatingMixin, SplitMergeMixin, UnmatchMatchMixin,
@ -582,18 +556,20 @@ class Show(Video, AdvancedSettingsMixin, ArtMixin, BannerMixin, PosterMixin, Rat
""" Returns list of unwatched :class:`~plexapi.video.Episode` objects. """
return self.episodes(viewCount=0)
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Download video files to specified directory.
def download(self, savepath=None, keep_original_name=False, subfolders=False, **kwargs):
""" Download all episodes from the show. See :func:`~plexapi.base.Playable.download` for details.
Parameters:
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original file name otherwise
a friendlier is generated.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated.
subfolders (bool): True to separate episodes in to season folders.
**kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`.
"""
filepaths = []
for episode in self.episodes():
filepaths += episode.download(savepath, keep_original_name, **kwargs)
_savepath = os.path.join(savepath, 'Season %s' % str(episode.seasonNumber).zfill(2)) if subfolders else savepath
filepaths += episode.download(_savepath, keep_original_name, **kwargs)
return filepaths
@ -714,12 +690,12 @@ class Season(Video, ArtMixin, PosterMixin, RatingMixin, CollectionMixin):
return self.episodes(viewCount=0)
def download(self, savepath=None, keep_original_name=False, **kwargs):
""" Download video files to specified directory.
""" Download all episodes from the season. See :func:`~plexapi.base.Playable.download` for details.
Parameters:
savepath (str): Defaults to current working dir.
keep_original_name (bool): True to keep the original file name otherwise
a friendlier is generated.
keep_original_name (bool): True to keep the original filename otherwise
a friendlier filename is generated.
**kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`.
"""
filepaths = []
@ -839,8 +815,8 @@ class Episode(Video, Playable, ArtMixin, PosterMixin, RatingMixin,
] if p])
def _prettyfilename(self):
""" Returns a human friendly filename. """
return '%s.%s' % (self.grandparentTitle.replace(' ', '.'), self.seasonEpisode)
""" Returns a filename for use in download. """
return '%s - %s - %s' % (self.grandparentTitle, self.seasonEpisode, self.title)
@property
def actors(self):
@ -953,6 +929,7 @@ class Clip(Video, Playable, ArtUrlMixin, PosterUrlMixin):
return [part.file for part in self.iterParts() if part]
def _prettyfilename(self):
""" Returns a filename for use in download. """
return self.title
@ -968,4 +945,5 @@ class Extra(Clip):
self.librarySectionTitle = parent.librarySectionTitle
def _prettyfilename(self):
""" Returns a filename for use in download. """
return '%s (%s)' % (self.title, self.subtype)