diff --git a/lib/charset_normalizer/__init__.py b/lib/charset_normalizer/__init__.py index ed525034..1aea851a 100644 --- a/lib/charset_normalizer/__init__.py +++ b/lib/charset_normalizer/__init__.py @@ -19,6 +19,8 @@ at . :copyright: (c) 2021 by Ahmed TAHRI :license: MIT, see LICENSE for more details. """ +import logging + from .api import from_bytes, from_fp, from_path, normalize from .legacy import ( CharsetDetector, @@ -28,6 +30,7 @@ from .legacy import ( detect, ) from .models import CharsetMatch, CharsetMatches +from .utils import set_logging_handler from .version import VERSION, __version__ __all__ = ( @@ -44,4 +47,10 @@ __all__ = ( "CharsetDoctor", "__version__", "VERSION", + "set_logging_handler", ) + +# Attach a NullHandler to the top level logger by default +# https://docs.python.org/3.3/howto/logging.html#configuring-logging-for-a-library + +logging.getLogger("charset_normalizer").addHandler(logging.NullHandler()) diff --git a/lib/charset_normalizer/api.py b/lib/charset_normalizer/api.py index dce7cf30..80e608b4 100644 --- a/lib/charset_normalizer/api.py +++ b/lib/charset_normalizer/api.py @@ -1,3 +1,4 @@ +import logging from os.path import basename, splitext from typing import BinaryIO, List, Optional, Set @@ -6,8 +7,6 @@ try: except ImportError: # pragma: no cover PathLike = str # type: ignore -import logging - from .cd import ( coherence_ratio, encoding_languages, @@ -27,11 +26,10 @@ from .utils import ( ) logger = logging.getLogger("charset_normalizer") -logger.setLevel(logging.DEBUG) - -handler = logging.StreamHandler() -handler.setFormatter(logging.Formatter("%(asctime)s | %(levelname)s | %(message)s")) -logger.addHandler(handler) +explain_handler = logging.StreamHandler() +explain_handler.setFormatter( + logging.Formatter("%(asctime)s | %(levelname)s | %(message)s") +) def from_bytes( @@ -57,6 +55,9 @@ def from_bytes( purpose. This function will strip the SIG in the payload/sequence every time except on UTF-16, UTF-32. + By default the library does not setup any handler other than the NullHandler, if you choose to set the 'explain' + toggle to True it will alter the logger configuration to add a StreamHandler that is suitable for debugging. + Custom logging format and handler can be set manually. """ if not isinstance(sequences, (bytearray, bytes)): @@ -66,10 +67,8 @@ def from_bytes( ) ) - if not explain: - logger.setLevel(logging.CRITICAL) - else: - logger.setLevel(logging.INFO) + if explain: + logger.addHandler(explain_handler) length = len(sequences) # type: int @@ -77,6 +76,8 @@ def from_bytes( logger.warning( "Given content is empty, stopping the process very early, returning empty utf_8 str match" ) + if explain: + logger.removeHandler(explain_handler) return CharsetMatches([CharsetMatch(sequences, "utf_8", 0.0, False, [], "")]) if cp_isolation is not None: @@ -131,7 +132,7 @@ def from_bytes( prioritized_encodings = [] # type: List[str] specified_encoding = ( - any_specified_encoding(sequences) if preemptive_behaviour is True else None + any_specified_encoding(sequences) if preemptive_behaviour else None ) # type: Optional[str] if specified_encoding is not None: @@ -185,7 +186,7 @@ def from_bytes( encoding_iana ) # type: bool - if encoding_iana in {"utf_16", "utf_32"} and bom_or_sig_available is False: + if encoding_iana in {"utf_16", "utf_32"} and not bom_or_sig_available: logger.info( "Encoding %s wont be tested as-is because it require a BOM. Will try some sub-encoder LE/BE.", encoding_iana, @@ -241,7 +242,7 @@ def from_bytes( continue r_ = range( - 0 if bom_or_sig_available is False else len(sig_payload), + 0 if not bom_or_sig_available else len(sig_payload), length, int(length / steps), ) @@ -261,29 +262,40 @@ def from_bytes( max_chunk_gave_up = int(len(r_) / 4) # type: int - if max_chunk_gave_up < 2: - max_chunk_gave_up = 2 - + max_chunk_gave_up = max(max_chunk_gave_up, 2) early_stop_count = 0 # type: int md_chunks = [] # type: List[str] md_ratios = [] for i in r_: + if i + chunk_size > length + 8: + continue + cut_sequence = sequences[i : i + chunk_size] if bom_or_sig_available and strip_sig_or_bom is False: cut_sequence = sig_payload + cut_sequence - chunk = cut_sequence.decode(encoding_iana, errors="ignore") # type: str + try: + chunk = cut_sequence.decode( + encoding_iana, + errors="ignore" if is_multi_byte_decoder else "strict", + ) # type: str + except UnicodeDecodeError as e: # Lazy str loading may have missed something there + logger.warning( + "LazyStr Loading: After MD chunk decode, code page %s does not fit given bytes sequence at ALL. %s", + encoding_iana, + str(e), + ) + early_stop_count = max_chunk_gave_up + break # multi-byte bad cutting detector and adjustment # not the cleanest way to perform that fix but clever enough for now. if is_multi_byte_decoder and i > 0 and sequences[i] >= 0x80: - chunk_partial_size_chk = ( - 16 if chunk_size > 16 else chunk_size - ) # type: int + chunk_partial_size_chk = min(chunk_size, 16) # type: int if ( decoded_payload @@ -312,11 +324,9 @@ def from_bytes( ): break - if md_ratios: - mean_mess_ratio = sum(md_ratios) / len(md_ratios) # type: float - else: - mean_mess_ratio = 0.0 - + mean_mess_ratio = ( + sum(md_ratios) / len(md_ratios) if md_ratios else 0.0 + ) # type: float if mean_mess_ratio >= threshold or early_stop_count >= max_chunk_gave_up: tested_but_soft_failure.append(encoding_iana) logger.warning( @@ -375,6 +385,20 @@ def from_bytes( ) ) + # We might want to check the sequence again with the whole content + # Only if initial MD/CD tests passes + if is_too_large_sequence and not is_multi_byte_decoder: + try: + sequences[int(50e3) :].decode(encoding_iana, errors="strict") + except UnicodeDecodeError as e: + logger.warning( + "LazyStr Loading: After final lookup, code page %s does not fit given bytes sequence at ALL. %s", + encoding_iana, + str(e), + ) + tested_but_hard_failure.append(encoding_iana) + continue + results.append( CharsetMatch( sequences, @@ -393,6 +417,8 @@ def from_bytes( logger.info( "%s is most likely the one. Stopping the process.", encoding_iana ) + if explain: + logger.removeHandler(explain_handler) return CharsetMatches([results[encoding_iana]]) if encoding_iana == sig_encoding: @@ -400,6 +426,8 @@ def from_bytes( "%s is most likely the one as we detected a BOM or SIG within the beginning of the sequence.", encoding_iana, ) + if explain: + logger.removeHandler(explain_handler) return CharsetMatches([results[encoding_iana]]) if len(results) == 0: @@ -428,6 +456,9 @@ def from_bytes( logger.warning("ascii will be used as a fallback match") results.append(fallback_ascii) + if explain: + logger.removeHandler(explain_handler) + return results diff --git a/lib/charset_normalizer/cd.py b/lib/charset_normalizer/cd.py index a4512fbb..8429a0eb 100644 --- a/lib/charset_normalizer/cd.py +++ b/lib/charset_normalizer/cd.py @@ -5,7 +5,7 @@ from functools import lru_cache from typing import Dict, List, Optional, Tuple from .assets import FREQUENCIES -from .constant import KO_NAMES, TOO_SMALL_SEQUENCE, ZH_NAMES +from .constant import KO_NAMES, LANGUAGE_SUPPORTED_COUNT, TOO_SMALL_SEQUENCE, ZH_NAMES from .md import is_suspiciously_successive_range from .models import CoherenceMatches from .utils import ( @@ -110,6 +110,23 @@ def mb_encoding_languages(iana_name: str) -> List[str]: return [] +@lru_cache(maxsize=LANGUAGE_SUPPORTED_COUNT) +def get_target_features(language: str) -> Tuple[bool, bool]: + """ + Determine main aspects from a supported language if it contains accents and if is pure Latin. + """ + target_have_accents = False # type: bool + target_pure_latin = True # type: bool + + for character in FREQUENCIES[language]: + if not target_have_accents and is_accentuated(character): + target_have_accents = True + if target_pure_latin and is_latin(character) is False: + target_pure_latin = False + + return target_have_accents, target_pure_latin + + def alphabet_languages( characters: List[str], ignore_non_latin: bool = False ) -> List[str]: @@ -118,23 +135,11 @@ def alphabet_languages( """ languages = [] # type: List[Tuple[str, float]] - source_have_accents = False # type: bool - - for character in characters: - if is_accentuated(character): - source_have_accents = True - break + source_have_accents = any(is_accentuated(character) for character in characters) for language, language_characters in FREQUENCIES.items(): - target_have_accents = False # type: bool - target_pure_latin = True # type: bool - - for language_character in language_characters: - if target_have_accents is False and is_accentuated(language_character): - target_have_accents = True - if target_pure_latin is True and is_latin(language_character) is False: - target_pure_latin = False + target_have_accents, target_pure_latin = get_target_features(language) if ignore_non_latin and target_pure_latin is False: continue @@ -263,8 +268,6 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches: The return type is the same as coherence_ratio. """ per_language_ratios = OrderedDict() # type: Dict[str, List[float]] - merge = [] # type: CoherenceMatches - for result in results: for sub_result in result: language, ratio = sub_result @@ -273,17 +276,16 @@ def merge_coherence_ratios(results: List[CoherenceMatches]) -> CoherenceMatches: continue per_language_ratios[language].append(ratio) - for language in per_language_ratios: - merge.append( - ( - language, - round( - sum(per_language_ratios[language]) - / len(per_language_ratios[language]), - 4, - ), - ) + merge = [ + ( + language, + round( + sum(per_language_ratios[language]) / len(per_language_ratios[language]), + 4, + ), ) + for language in per_language_ratios + ] return sorted(merge, key=lambda x: x[1], reverse=True) @@ -298,14 +300,11 @@ def coherence_ratio( """ results = [] # type: List[Tuple[str, float]] - lg_inclusion_list = [] # type: List[str] ignore_non_latin = False # type: bool sufficient_match_count = 0 # type: int - if lg_inclusion is not None: - lg_inclusion_list = lg_inclusion.split(",") - + lg_inclusion_list = lg_inclusion.split(",") if lg_inclusion is not None else [] if "Latin Based" in lg_inclusion_list: ignore_non_latin = True lg_inclusion_list.remove("Latin Based") @@ -314,7 +313,7 @@ def coherence_ratio( sequence_frequencies = Counter(layer) # type: Counter most_common = sequence_frequencies.most_common() - character_count = sum([o for c, o in most_common]) # type: int + character_count = sum(o for c, o in most_common) # type: int if character_count <= TOO_SMALL_SEQUENCE: continue diff --git a/lib/charset_normalizer/cli/normalizer.py b/lib/charset_normalizer/cli/normalizer.py index f1911259..5f912c92 100644 --- a/lib/charset_normalizer/cli/normalizer.py +++ b/lib/charset_normalizer/cli/normalizer.py @@ -235,20 +235,19 @@ def cli_detect(argv: List[str] = None) -> int: o_.insert(-1, best_guess.encoding) if my_file.closed is False: my_file.close() - else: - if ( - args.force is False - and query_yes_no( - 'Are you sure to normalize "{}" by replacing it ?'.format( - my_file.name - ), - "no", - ) - is False - ): - if my_file.closed is False: - my_file.close() - continue + elif ( + args.force is False + and query_yes_no( + 'Are you sure to normalize "{}" by replacing it ?'.format( + my_file.name + ), + "no", + ) + is False + ): + if my_file.closed is False: + my_file.close() + continue try: x_[0].unicode_path = abspath("./{}".format(".".join(o_))) @@ -277,7 +276,7 @@ def cli_detect(argv: List[str] = None) -> int: print( ", ".join( [ - el.encoding if el.encoding else "undefined" + el.encoding or "undefined" for el in x_ if el.path == abspath(my_file.name) ] diff --git a/lib/charset_normalizer/constant.py b/lib/charset_normalizer/constant.py index 2e5974d9..3d5d6457 100644 --- a/lib/charset_normalizer/constant.py +++ b/lib/charset_normalizer/constant.py @@ -4,6 +4,8 @@ from encodings.aliases import aliases from re import IGNORECASE, compile as re_compile from typing import Dict, List, Set, Union +from .assets import FREQUENCIES + # Contain for each eligible encoding a list of/item bytes SIG/BOM ENCODING_MARKS = OrderedDict( [ @@ -30,7 +32,7 @@ TOO_BIG_SEQUENCE = int(10e6) # type: int UTF8_MAXIMAL_ALLOCATION = 1112064 # type: int UNICODE_RANGES_COMBINED = { - "Control character": range(0, 31 + 1), + "Control character": range(31 + 1), "Basic Latin": range(32, 127 + 1), "Latin-1 Supplement": range(128, 255 + 1), "Latin Extended-A": range(256, 383 + 1), @@ -311,6 +313,7 @@ UNICODE_RANGES_COMBINED = { "Variation Selectors Supplement": range(917760, 917999 + 1), } # type: Dict[str, range] + UNICODE_SECONDARY_RANGE_KEYWORD = [ "Supplement", "Extended", @@ -352,11 +355,10 @@ IANA_SUPPORTED_SIMILAR = { "cp1140": ["cp037", "cp1026", "cp273", "cp500"], "cp1250": ["iso8859_2"], "cp1251": ["kz1048", "ptcp154"], - "cp1252": ["cp1258", "iso8859_15", "iso8859_9", "latin_1"], + "cp1252": ["iso8859_15", "iso8859_9", "latin_1"], "cp1253": ["iso8859_7"], - "cp1254": ["cp1258", "iso8859_15", "iso8859_9", "latin_1"], + "cp1254": ["iso8859_15", "iso8859_9", "latin_1"], "cp1257": ["iso8859_13"], - "cp1258": ["cp1252", "cp1254", "iso8859_9", "latin_1"], "cp273": ["cp037", "cp1026", "cp1140", "cp500"], "cp437": ["cp850", "cp858", "cp860", "cp861", "cp862", "cp863", "cp865"], "cp500": ["cp037", "cp1026", "cp1140", "cp273"], @@ -494,3 +496,5 @@ KO_NAMES = {"johab", "cp949", "euc_kr"} # type: Set[str] ZH_NAMES = {"big5", "cp950", "big5hkscs", "hz"} # type: Set[str] NOT_PRINTABLE_PATTERN = re_compile(r"[0-9\W\n\r\t]+") + +LANGUAGE_SUPPORTED_COUNT = len(FREQUENCIES) # type: int diff --git a/lib/charset_normalizer/md.py b/lib/charset_normalizer/md.py index 2146d61d..b55e95c4 100644 --- a/lib/charset_normalizer/md.py +++ b/lib/charset_normalizer/md.py @@ -40,11 +40,11 @@ class MessDetectorPlugin: """ raise NotImplementedError # pragma: nocover - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover """ Permit to reset the plugin to the initial state. """ - raise NotImplementedError # pragma: nocover + raise NotImplementedError @property def ratio(self) -> float: @@ -85,7 +85,7 @@ class TooManySymbolOrPunctuationPlugin(MessDetectorPlugin): self._last_printable_char = character - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._punctuation_count = 0 self._character_count = 0 self._symbol_count = 0 @@ -116,7 +116,7 @@ class TooManyAccentuatedPlugin(MessDetectorPlugin): if is_accentuated(character): self._accentuated_count += 1 - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._character_count = 0 self._accentuated_count = 0 @@ -147,7 +147,7 @@ class UnprintablePlugin(MessDetectorPlugin): self._unprintable_count += 1 self._character_count += 1 - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._unprintable_count = 0 @property @@ -170,18 +170,19 @@ class SuspiciousDuplicateAccentPlugin(MessDetectorPlugin): def feed(self, character: str) -> None: self._character_count += 1 - if self._last_latin_character is not None: - if is_accentuated(character) and is_accentuated(self._last_latin_character): - if character.isupper() and self._last_latin_character.isupper(): - self._successive_count += 1 - # Worse if its the same char duplicated with different accent. - if remove_accent(character) == remove_accent( - self._last_latin_character - ): - self._successive_count += 1 + if ( + self._last_latin_character is not None + and is_accentuated(character) + and is_accentuated(self._last_latin_character) + ): + if character.isupper() and self._last_latin_character.isupper(): + self._successive_count += 1 + # Worse if its the same char duplicated with different accent. + if remove_accent(character) == remove_accent(self._last_latin_character): + self._successive_count += 1 self._last_latin_character = character - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._successive_count = 0 self._character_count = 0 self._last_latin_character = None @@ -228,7 +229,7 @@ class SuspiciousRange(MessDetectorPlugin): self._last_printable_seen = character - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._character_count = 0 self._suspicious_successive_range_count = 0 self._last_printable_seen = None @@ -252,6 +253,8 @@ class SuperWeirdWordPlugin(MessDetectorPlugin): def __init__(self) -> None: self._word_count = 0 # type: int self._bad_word_count = 0 # type: int + self._foreign_long_count = 0 # type: int + self._is_current_word_bad = False # type: bool self._foreign_long_watch = False # type: bool @@ -271,7 +274,7 @@ class SuperWeirdWordPlugin(MessDetectorPlugin): self._buffer_accent_count += 1 if ( self._foreign_long_watch is False - and is_latin(character) is False + and (is_latin(character) is False or is_accentuated(character)) and is_cjk(character) is False and is_hangul(character) is False and is_katakana(character) is False @@ -290,9 +293,16 @@ class SuperWeirdWordPlugin(MessDetectorPlugin): self._character_count += buffer_length - if buffer_length >= 4 and self._buffer_accent_count / buffer_length > 0.34: - self._is_current_word_bad = True + if buffer_length >= 4: + if self._buffer_accent_count / buffer_length > 0.34: + self._is_current_word_bad = True + # Word/Buffer ending with a upper case accentuated letter are so rare, + # that we will consider them all as suspicious. Same weight as foreign_long suspicious. + if is_accentuated(self._buffer[-1]) and self._buffer[-1].isupper(): + self._foreign_long_count += 1 + self._is_current_word_bad = True if buffer_length >= 24 and self._foreign_long_watch: + self._foreign_long_count += 1 self._is_current_word_bad = True if self._is_current_word_bad: @@ -311,7 +321,7 @@ class SuperWeirdWordPlugin(MessDetectorPlugin): self._is_current_word_bad = True self._buffer += character - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._buffer = "" self._is_current_word_bad = False self._foreign_long_watch = False @@ -319,10 +329,11 @@ class SuperWeirdWordPlugin(MessDetectorPlugin): self._word_count = 0 self._character_count = 0 self._bad_character_count = 0 + self._foreign_long_count = 0 @property def ratio(self) -> float: - if self._word_count <= 10: + if self._word_count <= 10 and self._foreign_long_count == 0: return 0.0 return self._bad_character_count / self._character_count @@ -342,13 +353,13 @@ class CjkInvalidStopPlugin(MessDetectorPlugin): return True def feed(self, character: str) -> None: - if character in ["丅", "丄"]: + if character in {"丅", "丄"}: self._wrong_stop_count += 1 return if is_cjk(character): self._cjk_character_count += 1 - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._wrong_stop_count = 0 self._cjk_character_count = 0 @@ -418,7 +429,7 @@ class ArchaicUpperLowerPlugin(MessDetectorPlugin): self._character_count_since_last_sep += 1 self._last_alpha_seen = character - def reset(self) -> None: + def reset(self) -> None: # pragma: no cover self._character_count = 0 self._character_count_since_last_sep = 0 self._successive_upper_lower_count = 0 @@ -453,6 +464,13 @@ def is_suspiciously_successive_range( if "Emoticons" in unicode_range_a or "Emoticons" in unicode_range_b: return False + # Latin characters can be accompanied with a combining diacritical mark + # eg. Vietnamese. + if ("Latin" in unicode_range_a or "Latin" in unicode_range_b) and ( + "Combining" in unicode_range_a or "Combining" in unicode_range_b + ): + return False + keywords_range_a, keywords_range_b = unicode_range_a.split( " " ), unicode_range_b.split(" ") @@ -472,11 +490,12 @@ def is_suspiciously_successive_range( ), unicode_range_b in ("Hiragana", "Katakana"), ) - if range_a_jp_chars or range_b_jp_chars: - if "CJK" in unicode_range_a or "CJK" in unicode_range_b: - return False - if range_a_jp_chars and range_b_jp_chars: - return False + if (range_a_jp_chars or range_b_jp_chars) and ( + "CJK" in unicode_range_a or "CJK" in unicode_range_b + ): + return False + if range_a_jp_chars and range_b_jp_chars: + return False if "Hangul" in unicode_range_a or "Hangul" in unicode_range_b: if "CJK" in unicode_range_a or "CJK" in unicode_range_b: @@ -509,7 +528,7 @@ def mess_ratio( md_class() for md_class in MessDetectorPlugin.__subclasses__() ] # type: List[MessDetectorPlugin] - length = len(decoded_sequence) # type: int + length = len(decoded_sequence) + 1 # type: int mean_mess_ratio = 0.0 # type: float @@ -520,7 +539,7 @@ def mess_ratio( else: intermediary_mean_mess_ratio_calc = 128 - for character, index in zip(decoded_sequence, range(0, length)): + for character, index in zip(decoded_sequence + "\n", range(length)): for detector in detectors: if detector.eligible(character): detector.feed(character) @@ -528,7 +547,7 @@ def mess_ratio( if ( index > 0 and index % intermediary_mean_mess_ratio_calc == 0 ) or index == length - 1: - mean_mess_ratio = sum([dt.ratio for dt in detectors]) + mean_mess_ratio = sum(dt.ratio for dt in detectors) if mean_mess_ratio >= maximum_threshold: break diff --git a/lib/charset_normalizer/models.py b/lib/charset_normalizer/models.py index 68c27b89..c38da31f 100644 --- a/lib/charset_normalizer/models.py +++ b/lib/charset_normalizer/models.py @@ -284,8 +284,7 @@ class CharsetMatches: self._results = sorted(results) if results else [] # type: List[CharsetMatch] def __iter__(self) -> Iterator[CharsetMatch]: - for result in self._results: - yield result + yield from self._results def __getitem__(self, item: Union[int, str]) -> CharsetMatch: """ diff --git a/lib/charset_normalizer/utils.py b/lib/charset_normalizer/utils.py index b9d12784..dcb14dfe 100644 --- a/lib/charset_normalizer/utils.py +++ b/lib/charset_normalizer/utils.py @@ -4,6 +4,7 @@ except ImportError: import unicodedata # type: ignore[no-redef] import importlib +import logging from codecs import IncrementalDecoder from encodings.aliases import aliases from functools import lru_cache @@ -122,7 +123,7 @@ def is_emoticon(character: str) -> bool: @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) def is_separator(character: str) -> bool: - if character.isspace() or character in ["|", "+", ",", ";", "<", ">"]: + if character.isspace() or character in {"|", "+", ",", ";", "<", ">"}: return True character_category = unicodedata.category(character) # type: str @@ -138,7 +139,7 @@ def is_case_variable(character: str) -> bool: def is_private_use_only(character: str) -> bool: character_category = unicodedata.category(character) # type: str - return "Co" == character_category + return character_category == "Co" @lru_cache(maxsize=UTF8_MAXIMAL_ALLOCATION) @@ -193,11 +194,7 @@ def is_thai(character: str) -> bool: @lru_cache(maxsize=len(UNICODE_RANGES_COMBINED)) def is_unicode_range_secondary(range_name: str) -> bool: - for keyword in UNICODE_SECONDARY_RANGE_KEYWORD: - if keyword in range_name: - return True - - return False + return any(keyword in range_name for keyword in UNICODE_SECONDARY_RANGE_KEYWORD) def any_specified_encoding(sequence: bytes, search_zone: int = 4096) -> Optional[str]: @@ -211,9 +208,7 @@ def any_specified_encoding(sequence: bytes, search_zone: int = 4096) -> Optional results = findall( RE_POSSIBLE_ENCODING_INDICATION, - sequence[: seq_len if seq_len <= search_zone else search_zone].decode( - "ascii", errors="ignore" - ), + sequence[: min(seq_len, search_zone)].decode("ascii", errors="ignore"), ) # type: List[str] if len(results) == 0: @@ -278,7 +273,7 @@ def iana_name(cp_name: str, strict: bool = True) -> str: cp_name = cp_name.lower().replace("-", "_") for encoding_alias, encoding_iana in aliases.items(): - if cp_name == encoding_alias or cp_name == encoding_iana: + if cp_name in [encoding_alias, encoding_iana]: return encoding_iana if strict: @@ -314,7 +309,7 @@ def cp_similarity(iana_name_a: str, iana_name_b: str) -> float: character_match_count = 0 # type: int - for i in range(0, 255): + for i in range(255): to_be_decoded = bytes([i]) # type: bytes if id_a.decode(to_be_decoded) == id_b.decode(to_be_decoded): character_match_count += 1 @@ -331,3 +326,17 @@ def is_cp_similar(iana_name_a: str, iana_name_b: str) -> bool: iana_name_a in IANA_SUPPORTED_SIMILAR and iana_name_b in IANA_SUPPORTED_SIMILAR[iana_name_a] ) + + +def set_logging_handler( + name: str = "charset_normalizer", + level: int = logging.INFO, + format_string: str = "%(asctime)s | %(levelname)s | %(message)s", +) -> None: + + logger = logging.getLogger(name) + logger.setLevel(level) + + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter(format_string)) + logger.addHandler(handler) diff --git a/lib/charset_normalizer/version.py b/lib/charset_normalizer/version.py index 98e53fb3..d48da8ab 100644 --- a/lib/charset_normalizer/version.py +++ b/lib/charset_normalizer/version.py @@ -2,5 +2,5 @@ Expose version """ -__version__ = "2.0.7" +__version__ = "2.0.8" VERSION = __version__.split(".") diff --git a/lib/plexapi/alert.py b/lib/plexapi/alert.py index 9e0310fd..79ecc445 100644 --- a/lib/plexapi/alert.py +++ b/lib/plexapi/alert.py @@ -29,14 +29,18 @@ class AlertListener(threading.Thread): callback (func): Callback function to call on received messages. The callback function will be sent a single argument 'data' which will contain a dictionary of data received from the server. :samp:`def my_callback(data): ...` + callbackError (func): Callback function to call on errors. The callback function + will be sent a single argument 'error' which will contain the Error object. + :samp:`def my_callback(error): ...` """ key = '/:/websockets/notifications' - def __init__(self, server, callback=None): + def __init__(self, server, callback=None, callbackError=None): super(AlertListener, self).__init__() self.daemon = True self._server = server self._callback = callback + self._callbackError = callbackError self._ws = None def run(self): @@ -84,4 +88,9 @@ class AlertListener(threading.Thread): This is to support compatibility with current and previous releases of websocket-client. """ err = args[-1] - log.error('AlertListener Error: %s', err) + try: + log.error('AlertListener Error: %s', err) + if self._callbackError: + self._callbackError(err) + except Exception as err: # pragma: no cover + log.error('AlertListener Error: Error: %s', err) diff --git a/lib/plexapi/audio.py b/lib/plexapi/audio.py index 20a70ffa..656b9250 100644 --- a/lib/plexapi/audio.py +++ b/lib/plexapi/audio.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import os from urllib.parse import quote_plus from plexapi import library, media, utils @@ -205,23 +206,20 @@ class Artist(Audio, AdvancedSettingsMixin, ArtMixin, PosterMixin, RatingMixin, S """ Alias of :func:`~plexapi.audio.Artist.track`. """ return self.track(title, album, track) - def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Downloads all tracks for the artist to the specified location. + def download(self, savepath=None, keep_original_name=False, subfolders=False, **kwargs): + """ Download all tracks from the artist. See :func:`~plexapi.base.Playable.download` for details. Parameters: - savepath (str): Title of the track to return. - keep_original_name (bool): Set True to keep the original filename as stored in - the Plex server. False will create a new filename with the format - " - ". - kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will - be returned and the additional arguments passed in will be sent to that - function. If kwargs is not specified, the media items will be downloaded - and saved to disk. + savepath (str): Defaults to current working dir. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. + subfolders (bool): True to separate tracks in to album folders. + **kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`. """ filepaths = [] - for album in self.albums(): - for track in album.tracks(): - filepaths += track.download(savepath, keep_original_name, **kwargs) + for track in self.tracks(): + _savepath = os.path.join(savepath, track.parentTitle) if subfolders else savepath + filepaths += track.download(_savepath, keep_original_name, **kwargs) return filepaths @@ -314,17 +312,13 @@ class Album(Audio, ArtMixin, PosterMixin, RatingMixin, UnmatchMatchMixin, return self.fetchItem(self.parentKey) def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Downloads all tracks for the artist to the specified location. + """ Download all tracks from the album. See :func:`~plexapi.base.Playable.download` for details. Parameters: - savepath (str): Title of the track to return. - keep_original_name (bool): Set True to keep the original filename as stored in - the Plex server. False will create a new filename with the format - " - ". - kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will - be returned and the additional arguments passed in will be sent to that - function. If kwargs is not specified, the media items will be downloaded - and saved to disk. + savepath (str): Defaults to current working dir. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. + **kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`. """ filepaths = [] for track in self.tracks(): @@ -398,7 +392,8 @@ class Track(Audio, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixin, def _prettyfilename(self): """ Returns a filename for use in download. """ - return '%s - %s %s' % (self.grandparentTitle, self.parentTitle, self.title) + return '%s - %s - %s - %s' % ( + self.grandparentTitle, self.parentTitle, str(self.trackNumber).zfill(2), self.title) def album(self): """ Return the track's :class:`~plexapi.audio.Album`. """ diff --git a/lib/plexapi/base.py b/lib/plexapi/base.py index ab1bef44..94bfe2ad 100644 --- a/lib/plexapi/base.py +++ b/lib/plexapi/base.py @@ -681,34 +681,50 @@ class Playable(object): client.playMedia(self) def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Downloads this items media to the specified location. Returns a list of + """ Downloads the media item to the specified location. Returns a list of filepaths that have been saved to disk. Parameters: - savepath (str): Title of the track to return. - keep_original_name (bool): Set True to keep the original filename as stored in - the Plex server. False will create a new filename with the format - " - ". - kwargs (dict): If specified, a :func:`~plexapi.audio.Track.getStreamURL` will - be returned and the additional arguments passed in will be sent to that - function. If kwargs is not specified, the media items will be downloaded - and saved to disk. + savepath (str): Defaults to current working dir. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. See filenames below. + **kwargs (dict): Additional options passed into :func:`~plexapi.audio.Track.getStreamURL` + to download a transcoded stream, otherwise the media item will be downloaded + as-is and saved to disk. + + **Filenames** + + * Movie: `` (<year>)`` + * Episode: ``<show title> - s00e00 - <episode title>`` + * Track: ``<artist title> - <album title> - 00 - <track title>`` + * Photo: ``<photoalbum title> - <photo/clip title>`` or ``<photo/clip title>`` """ filepaths = [] - locations = [i for i in self.iterParts() if i] - for location in locations: - filename = location.file - if keep_original_name is False: - filename = '%s.%s' % (self._prettyfilename(), location.container) - # So this seems to be a alot slower but allows transcode. + parts = [i for i in self.iterParts() if i] + + for part in parts: + if not keep_original_name: + filename = utils.cleanFilename('%s.%s' % (self._prettyfilename(), part.container)) + else: + filename = part.file + if kwargs: + # So this seems to be a alot slower but allows transcode. download_url = self.getStreamURL(**kwargs) else: - download_url = self._server.url('%s?download=1' % location.key) - filepath = utils.download(download_url, self._server._token, filename=filename, - savepath=savepath, session=self._server._session) + download_url = self._server.url('%s?download=1' % part.key) + + filepath = utils.download( + download_url, + self._server._token, + filename=filename, + savepath=savepath, + session=self._server._session + ) + if filepath: filepaths.append(filepath) + return filepaths def stop(self, reason=''): diff --git a/lib/plexapi/const.py b/lib/plexapi/const.py index cb518feb..61c96c0b 100644 --- a/lib/plexapi/const.py +++ b/lib/plexapi/const.py @@ -3,7 +3,7 @@ # Library version MAJOR_VERSION = 4 -MINOR_VERSION = 7 -PATCH_VERSION = 2 +MINOR_VERSION = 8 +PATCH_VERSION = 0 __short_version__ = f"{MAJOR_VERSION}.{MINOR_VERSION}" __version__ = f"{__short_version__}.{PATCH_VERSION}" diff --git a/lib/plexapi/library.py b/lib/plexapi/library.py index f62d5913..12ae407f 100644 --- a/lib/plexapi/library.py +++ b/lib/plexapi/library.py @@ -26,47 +26,61 @@ class Library(PlexObject): def _loadData(self, data): self._data = data - self._sectionsByID = {} # cached Section UUIDs self.identifier = data.attrib.get('identifier') self.mediaTagVersion = data.attrib.get('mediaTagVersion') self.title1 = data.attrib.get('title1') self.title2 = data.attrib.get('title2') + self._sectionsByID = {} # cached sections by key + self._sectionsByTitle = {} # cached sections by title + + def _loadSections(self): + """ Loads and caches all the library sections. """ + key = '/library/sections' + self._sectionsByID = {} + self._sectionsByTitle = {} + for elem in self._server.query(key): + for cls in (MovieSection, ShowSection, MusicSection, PhotoSection): + if elem.attrib.get('type') == cls.TYPE: + section = cls(self._server, elem, key) + self._sectionsByID[section.key] = section + self._sectionsByTitle[section.title.lower()] = section def sections(self): """ Returns a list of all media sections in this library. Library sections may be any of :class:`~plexapi.library.MovieSection`, :class:`~plexapi.library.ShowSection`, :class:`~plexapi.library.MusicSection`, :class:`~plexapi.library.PhotoSection`. """ - key = '/library/sections' - sections = [] - for elem in self._server.query(key): - for cls in (MovieSection, ShowSection, MusicSection, PhotoSection): - if elem.attrib.get('type') == cls.TYPE: - section = cls(self._server, elem, key) - self._sectionsByID[section.key] = section - sections.append(section) - return sections + self._loadSections() + return list(self._sectionsByID.values()) - def section(self, title=None): + def section(self, title): """ Returns the :class:`~plexapi.library.LibrarySection` that matches the specified title. Parameters: title (str): Title of the section to return. """ - for section in self.sections(): - if section.title.lower() == title.lower(): - return section - raise NotFound('Invalid library section: %s' % title) + if not self._sectionsByTitle or title not in self._sectionsByTitle: + self._loadSections() + try: + return self._sectionsByTitle[title.lower()] + except KeyError: + raise NotFound('Invalid library section: %s' % title) from None def sectionByID(self, sectionID): """ Returns the :class:`~plexapi.library.LibrarySection` that matches the specified sectionID. Parameters: sectionID (int): ID of the section to return. + + Raises: + :exc:`~plexapi.exceptions.NotFound`: The library section ID is not found on the server. """ if not self._sectionsByID or sectionID not in self._sectionsByID: - self.sections() - return self._sectionsByID[sectionID] + self._loadSections() + try: + return self._sectionsByID[sectionID] + except KeyError: + raise NotFound('Invalid library sectionID: %s' % sectionID) from None def all(self, **kwargs): """ Returns a list of all media from all library sections. @@ -356,6 +370,9 @@ class LibrarySection(PlexObject): self._filterTypes = None self._fieldTypes = None self._totalViewSize = None + self._totalSize = None + self._totalDuration = None + self._totalStorage = None def fetchItems(self, ekey, cls=None, container_start=None, container_size=None, **kwargs): """ Load the specified key to find and build all items with the specified tag @@ -394,7 +411,36 @@ class LibrarySection(PlexObject): @property def totalSize(self): """ Returns the total number of items in the library for the default library type. """ - return self.totalViewSize(includeCollections=False) + if self._totalSize is None: + self._totalSize = self.totalViewSize(includeCollections=False) + return self._totalSize + + @property + def totalDuration(self): + """ Returns the total duration (in milliseconds) of items in the library. """ + if self._totalDuration is None: + self._getTotalDurationStorage() + return self._totalDuration + + @property + def totalStorage(self): + """ Returns the total storage (in bytes) of items in the library. """ + if self._totalStorage is None: + self._getTotalDurationStorage() + return self._totalStorage + + def _getTotalDurationStorage(self): + """ Queries the Plex server for the total library duration and storage and caches the values. """ + data = self._server.query('/media/providers?includeStorage=1') + xpath = ( + './MediaProvider[@identifier="com.plexapp.plugins.library"]' + '/Feature[@type="content"]' + '/Directory[@id="%s"]' + ) % self.key + directory = next(iter(data.findall(xpath)), None) + if directory: + self._totalDuration = utils.cast(int, directory.attrib.get('durationTotal')) + self._totalStorage = utils.cast(int, directory.attrib.get('storageTotal')) def totalViewSize(self, libtype=None, includeCollections=True): """ Returns the total number of items in the library for a specified libtype. @@ -432,8 +478,12 @@ class LibrarySection(PlexObject): log.error(msg) raise - def reload(self, key=None): - return self._server.library.section(self.title) + def reload(self): + """ Reload the data for the library section. """ + self._server.library._loadSections() + newLibrary = self._server.library.sectionByID(self.key) + self.__dict__.update(newLibrary.__dict__) + return self def edit(self, agent=None, **kwargs): """ Edit a library (Note: agent is required). See :class:`~plexapi.library.Library` for example usage. @@ -446,11 +496,6 @@ class LibrarySection(PlexObject): part = '/library/sections/%s?agent=%s&%s' % (self.key, agent, urlencode(kwargs)) self._server.query(part, method=self._server._session.put) - # Reload this way since the self.key dont have a full path, but is simply a id. - for s in self._server.library.sections(): - if s.key == self.key: - return s - def get(self, title): """ Returns the media item with the specified title. diff --git a/lib/plexapi/media.py b/lib/plexapi/media.py index 82766e77..95385c4a 100644 --- a/lib/plexapi/media.py +++ b/lib/plexapi/media.py @@ -79,13 +79,16 @@ class Media(PlexObject): self.make = data.attrib.get('make') self.model = data.attrib.get('model') + parent = self._parent() + self._parentKey = parent.key + @property def isOptimizedVersion(self): """ Returns True if the media is a Plex optimized version. """ return self.proxyType == utils.SEARCHTYPES['optimizedVersion'] def delete(self): - part = self._initpath + '/media/%s' % self.id + part = '%s/media/%s' % (self._parentKey, self.id) try: return self._server.query(part, method=self._server._session.delete) except BadRequest: diff --git a/lib/plexapi/myplex.py b/lib/plexapi/myplex.py index 15da2115..312050a9 100644 --- a/lib/plexapi/myplex.py +++ b/lib/plexapi/myplex.py @@ -70,9 +70,6 @@ class MyPlexAccount(PlexObject): PLEXSERVERS = 'https://plex.tv/api/servers/{machineId}' # get FRIENDUPDATE = 'https://plex.tv/api/friends/{userId}' # put with args, delete REMOVEHOMEUSER = 'https://plex.tv/api/home/users/{userId}' # delete - REMOVEINVITE = 'https://plex.tv/api/invites/requested/{userId}?friend=1&server=1&home=1' # delete - REQUESTED = 'https://plex.tv/api/invites/requested' # get - REQUESTS = 'https://plex.tv/api/invites/requests' # get SIGNIN = 'https://plex.tv/users/sign_in.xml' # get with auth WEBHOOKS = 'https://plex.tv/api/v2/user/webhooks' # get, post with data OPTOUTS = 'https://plex.tv/api/v2/user/%(userUUID)s/settings/opt_outs' # get @@ -365,26 +362,55 @@ class MyPlexAccount(PlexObject): return self.query(url, self._session.post, headers=headers) def removeFriend(self, user): - """ Remove the specified user from all sharing. + """ Remove the specified user from your friends. Parameters: - user (str): MyPlexUser, username, email of the user to be added. + user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed. """ - user = self.user(user) - url = self.FRIENDUPDATE if user.friend else self.REMOVEINVITE - url = url.format(userId=user.id) + user = user if isinstance(user, MyPlexUser) else self.user(user) + url = self.FRIENDUPDATE.format(userId=user.id) return self.query(url, self._session.delete) def removeHomeUser(self, user): - """ Remove the specified managed user from home. + """ Remove the specified user from your home users. Parameters: - user (str): MyPlexUser, username, email of the user to be removed from home. + user (str): :class:`~plexapi.myplex.MyPlexUser`, username, or email of the user to be removed. """ - user = self.user(user) + user = user if isinstance(user, MyPlexUser) else self.user(user) url = self.REMOVEHOMEUSER.format(userId=user.id) return self.query(url, self._session.delete) + def acceptInvite(self, user): + """ Accept a pending firend invite from the specified user. + + Parameters: + user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to accept. + """ + invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeSent=False) + params = { + 'friend': int(invite.friend), + 'home': int(invite.home), + 'server': int(invite.server) + } + url = MyPlexInvite.REQUESTS + '/%s' % invite.id + utils.joinArgs(params) + return self.query(url, self._session.put) + + def cancelInvite(self, user): + """ Cancel a pending firend invite for the specified user. + + Parameters: + user (str): :class:`~plexapi.myplex.MyPlexInvite`, username, or email of the friend invite to cancel. + """ + invite = user if isinstance(user, MyPlexInvite) else self.pendingInvite(user, includeReceived=False) + params = { + 'friend': int(invite.friend), + 'home': int(invite.home), + 'server': int(invite.server) + } + url = MyPlexInvite.REQUESTED + '/%s' % invite.id + utils.joinArgs(params) + return self.query(url, self._session.delete) + def updateFriend(self, user, server, sections=None, removeSections=False, allowSync=None, allowCameraUpload=None, allowChannels=None, filterMovies=None, filterTelevision=None, filterMusic=None): """ Update the specified user's share settings. @@ -455,7 +481,7 @@ class MyPlexAccount(PlexObject): return response_servers, response_filters def user(self, username): - """ Returns the :class:`~plexapi.myplex.MyPlexUser` that matches the email or username specified. + """ Returns the :class:`~plexapi.myplex.MyPlexUser` that matches the specified username or email. Parameters: username (str): Username, email or id of the user to return. @@ -467,19 +493,50 @@ class MyPlexAccount(PlexObject): return user elif (user.username and user.email and user.id and username.lower() in - (user.username.lower(), user.email.lower(), str(user.id))): + (user.username.lower(), user.email.lower(), str(user.id))): return user raise NotFound('Unable to find user %s' % username) def users(self): """ Returns a list of all :class:`~plexapi.myplex.MyPlexUser` objects connected to your account. - This includes both friends and pending invites. You can reference the user.friend to - distinguish between the two. """ - friends = [MyPlexUser(self, elem) for elem in self.query(MyPlexUser.key)] - requested = [MyPlexUser(self, elem, self.REQUESTED) for elem in self.query(self.REQUESTED)] - return friends + requested + elem = self.query(MyPlexUser.key) + return self.findItems(elem, cls=MyPlexUser) + + def pendingInvite(self, username, includeSent=True, includeReceived=True): + """ Returns the :class:`~plexapi.myplex.MyPlexInvite` that matches the specified username or email. + Note: This can be a pending invite sent from your account or received to your account. + + Parameters: + username (str): Username, email or id of the user to return. + includeSent (bool): True to include sent invites. + includeReceived (bool): True to include received invites. + """ + username = str(username) + for invite in self.pendingInvites(includeSent, includeReceived): + if (invite.username and invite.email and invite.id and username.lower() in + (invite.username.lower(), invite.email.lower(), str(invite.id))): + return invite + + raise NotFound('Unable to find invite %s' % username) + + def pendingInvites(self, includeSent=True, includeReceived=True): + """ Returns a list of all :class:`~plexapi.myplex.MyPlexInvite` objects connected to your account. + Note: This includes all pending invites sent from your account and received to your account. + + Parameters: + includeSent (bool): True to include sent invites. + includeReceived (bool): True to include received invites. + """ + invites = [] + if includeSent: + elem = self.query(MyPlexInvite.REQUESTED) + invites += self.findItems(elem, cls=MyPlexInvite) + if includeReceived: + elem = self.query(MyPlexInvite.REQUESTS) + invites += self.findItems(elem, cls=MyPlexInvite) + return invites def _getSectionIds(self, server, sections): """ Converts a list of section objects or names to sectionIds needed for library sharing. """ @@ -731,10 +788,10 @@ class MyPlexUser(PlexObject): protected (False): Unknown (possibly SSL enabled?). recommendationsPlaylistId (str): Unknown. restricted (str): Unknown. + servers (List<:class:`~plexapi.myplex.<MyPlexServerShare`>)): Servers shared with the user. thumb (str): Link to the users avatar. title (str): Seems to be an aliad for username. username (str): User's username. - servers: Servers shared between user and friend """ TAG = 'User' key = 'https://plex.tv/api/users/' @@ -796,6 +853,43 @@ class MyPlexUser(PlexObject): return hist +class MyPlexInvite(PlexObject): + """ This object represents pending friend invites. + + Attributes: + TAG (str): 'Invite' + createdAt (datetime): Datetime the user was invited. + email (str): User's email address (user@gmail.com). + friend (bool): True or False if the user is invited as a friend. + friendlyName (str): The user's friendly name. + home (bool): True or False if the user is invited to a Plex Home. + id (int): User's Plex account ID. + server (bool): True or False if the user is invited to any servers. + servers (List<:class:`~plexapi.myplex.<MyPlexServerShare`>)): Servers shared with the user. + thumb (str): Link to the users avatar. + username (str): User's username. + """ + TAG = 'Invite' + REQUESTS = 'https://plex.tv/api/invites/requests' + REQUESTED = 'https://plex.tv/api/invites/requested' + + def _loadData(self, data): + """ Load attribute values from Plex XML response. """ + self._data = data + self.createdAt = utils.toDatetime(data.attrib.get('createdAt')) + self.email = data.attrib.get('email') + self.friend = utils.cast(bool, data.attrib.get('friend')) + self.friendlyName = data.attrib.get('friendlyName') + self.home = utils.cast(bool, data.attrib.get('home')) + self.id = utils.cast(int, data.attrib.get('id')) + self.server = utils.cast(bool, data.attrib.get('server')) + self.servers = self.findItems(data, MyPlexServerShare) + self.thumb = data.attrib.get('thumb') + self.username = data.attrib.get('username', '') + for server in self.servers: + server.accountID = self.id + + class Section(PlexObject): """ This refers to a shared section. The raw xml for the data presented here can be found at: https://plex.tv/api/servers/{machineId}/shared_servers diff --git a/lib/plexapi/photo.py b/lib/plexapi/photo.py index 6a60db81..c24d7fb1 100644 --- a/lib/plexapi/photo.py +++ b/lib/plexapi/photo.py @@ -1,4 +1,5 @@ # -*- coding: utf-8 -*- +import os from urllib.parse import quote_plus from plexapi import media, utils, video @@ -107,34 +108,21 @@ class Photoalbum(PlexPartialObject, ArtMixin, PosterMixin, RatingMixin): """ Alias to :func:`~plexapi.photo.Photoalbum.photo`. """ return self.episode(title) - def iterParts(self): - """ Iterates over the parts of the media item. """ - for album in self.albums(): - for photo in album.photos(): - for part in photo.iterParts(): - yield part - - def download(self, savepath=None, keep_original_name=False, showstatus=False): - """ Download photo files to specified directory. + def download(self, savepath=None, keep_original_name=False, subfolders=False): + """ Download all photos and clips from the photo ablum. See :func:`~plexapi.base.Playable.download` for details. Parameters: savepath (str): Defaults to current working dir. - keep_original_name (bool): True to keep the original file name otherwise - a friendlier is generated. - showstatus(bool): Display a progressbar. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. + subfolders (bool): True to separate photos/clips in to photo album folders. """ filepaths = [] - locations = [i for i in self.iterParts() if i] - for location in locations: - name = location.file - if not keep_original_name: - title = self.title.replace(' ', '.') - name = '%s.%s' % (title, location.container) - url = self._server.url('%s?download=1' % location.key) - filepath = utils.download(url, self._server._token, filename=name, showstatus=showstatus, - savepath=savepath, session=self._server._session) - if filepath: - filepaths.append(filepath) + for album in self.albums(): + _savepath = os.path.join(savepath, album.title) if subfolders else savepath + filepaths += album.download(_savepath, keep_original_name) + for photo in self.photos() + self.clips(): + filepaths += photo.download(savepath, keep_original_name) return filepaths def _getWebURL(self, base=None): @@ -218,6 +206,12 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi self.userRating = utils.cast(float, data.attrib.get('userRating')) self.year = utils.cast(int, data.attrib.get('year')) + def _prettyfilename(self): + """ Returns a filename for use in download. """ + if self.parentTitle: + return '%s - %s' % (self.parentTitle, self.title) + return self.title + def photoalbum(self): """ Return the photo's :class:`~plexapi.photo.Photoalbum`. """ return self.fetchItem(self.parentKey) @@ -241,12 +235,6 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi """ return [part.file for item in self.media for part in item.parts if part] - def iterParts(self): - """ Iterates over the parts of the media item. """ - for item in self.media: - for part in item.parts: - yield part - def sync(self, resolution, client=None, clientId=None, limit=None, title=None): """ Add current photo as sync item for specified device. See :func:`~plexapi.myplex.MyPlexAccount.sync` for possible exceptions. @@ -283,29 +271,6 @@ class Photo(PlexPartialObject, Playable, ArtUrlMixin, PosterUrlMixin, RatingMixi return myplex.sync(sync_item, client=client, clientId=clientId) - def download(self, savepath=None, keep_original_name=False, showstatus=False): - """ Download photo files to specified directory. - - Parameters: - savepath (str): Defaults to current working dir. - keep_original_name (bool): True to keep the original file name otherwise - a friendlier is generated. - showstatus(bool): Display a progressbar. - """ - filepaths = [] - locations = [i for i in self.iterParts() if i] - for location in locations: - name = location.file - if not keep_original_name: - title = self.title.replace(' ', '.') - name = '%s.%s' % (title, location.container) - url = self._server.url('%s?download=1' % location.key) - filepath = utils.download(url, self._server._token, filename=name, showstatus=showstatus, - savepath=savepath, session=self._server._session) - if filepath: - filepaths.append(filepath) - return filepaths - def _getWebURL(self, base=None): """ Get the Plex Web URL with the correct parameters. """ return self._server._buildWebURL(base=base, endpoint='details', key=self.parentKey, legacy=1) diff --git a/lib/plexapi/server.py b/lib/plexapi/server.py index 5c0beaed..ab359b3f 100644 --- a/lib/plexapi/server.py +++ b/lib/plexapi/server.py @@ -734,21 +734,46 @@ class PlexServer(PlexObject): notifier.start() return notifier - def transcodeImage(self, media, height, width, opacity=100, saturation=100): - """ Returns the URL for a transcoded image from the specified media object. - Returns None if no media specified (needed if user tries to pass thumb - or art directly). + def transcodeImage(self, imageUrl, height, width, + opacity=None, saturation=None, blur=None, background=None, + minSize=True, upscale=True, imageFormat=None): + """ Returns the URL for a transcoded image. Parameters: + imageUrl (str): The URL to the image + (eg. returned by :func:`~plexapi.mixins.PosterUrlMixin.thumbUrl` + or :func:`~plexapi.mixins.ArtUrlMixin.artUrl`). + The URL can be an online image. height (int): Height to transcode the image to. width (int): Width to transcode the image to. - opacity (int): Opacity of the resulting image (possibly deprecated). - saturation (int): Saturating of the resulting image. + opacity (int, optional): Change the opacity of the image (0 to 100) + saturation (int, optional): Change the saturation of the image (0 to 100). + blur (int, optional): The blur to apply to the image in pixels (e.g. 3). + background (str, optional): The background hex colour to apply behind the opacity (e.g. '000000'). + minSize (bool, optional): Maintain smallest dimension. Default True. + upscale (bool, optional): Upscale the image if required. Default True. + imageFormat (str, optional): 'jpeg' (default) or 'png'. """ - if media: - transcode_url = '/photo/:/transcode?height=%s&width=%s&opacity=%s&saturation=%s&url=%s' % ( - height, width, opacity, saturation, media) - return self.url(transcode_url, includeToken=True) + params = { + 'url': imageUrl, + 'height': height, + 'width': width, + 'minSize': int(bool(minSize)), + 'upscale': int(bool(upscale)) + } + if opacity is not None: + params['opacity'] = opacity + if saturation is not None: + params['saturation'] = saturation + if blur is not None: + params['blur'] = blur + if background is not None: + params['background'] = str(background).strip('#') + if imageFormat is not None: + params['format'] = imageFormat.lower() + + key = '/photo/:/transcode%s' % utils.joinArgs(params) + return self.url(key, includeToken=True) def url(self, key, includeToken=None): """ Build a URL string with proper token argument. Token will be appended to the URL diff --git a/lib/plexapi/utils.py b/lib/plexapi/utils.py index 310200f6..92d64299 100644 --- a/lib/plexapi/utils.py +++ b/lib/plexapi/utils.py @@ -4,7 +4,9 @@ import functools import logging import os import re +import string import time +import unicodedata import warnings import zipfile from datetime import datetime @@ -251,6 +253,13 @@ def toList(value, itemcast=None, delim=','): return [itemcast(item) for item in value.split(delim) if item != ''] +def cleanFilename(filename, replace='_'): + whitelist = "-_.()[] {}{}".format(string.ascii_letters, string.digits) + cleaned_filename = unicodedata.normalize('NFKD', filename).encode('ASCII', 'ignore').decode() + cleaned_filename = ''.join(c if c in whitelist else replace for c in cleaned_filename) + return cleaned_filename + + def downloadSessionImages(server, filename=None, height=150, width=150, opacity=100, saturation=100): # pragma: no cover """ Helper to download a bif image or thumb.url from plex.server.sessions. diff --git a/lib/plexapi/video.py b/lib/plexapi/video.py index 090d9502..4049d6c5 100644 --- a/lib/plexapi/video.py +++ b/lib/plexapi/video.py @@ -357,8 +357,8 @@ class Movie(Video, Playable, AdvancedSettingsMixin, ArtMixin, PosterMixin, Ratin return any(part.hasPreviewThumbnails for media in self.media for part in media.parts) def _prettyfilename(self): - # This is just for compat. - return self.title + """ Returns a filename for use in download. """ + return '%s (%s)' % (self.title, self.year) def reviews(self): """ Returns a list of :class:`~plexapi.media.Review` objects. """ @@ -375,32 +375,6 @@ class Movie(Video, Playable, AdvancedSettingsMixin, ArtMixin, PosterMixin, Ratin data = self._server.query(self._details_key) return self.findItems(data, library.Hub, rtag='Related') - def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Download video files to specified directory. - - Parameters: - savepath (str): Defaults to current working dir. - keep_original_name (bool): True to keep the original file name otherwise - a friendlier is generated. - **kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`. - """ - filepaths = [] - locations = [i for i in self.iterParts() if i] - for location in locations: - name = location.file - if not keep_original_name: - title = self.title.replace(' ', '.') - name = '%s.%s' % (title, location.container) - if kwargs is not None: - url = self.getStreamURL(**kwargs) - else: - self._server.url('%s?download=1' % location.key) - filepath = utils.download(url, self._server._token, filename=name, - savepath=savepath, session=self._server._session) - if filepath: - filepaths.append(filepath) - return filepaths - @utils.registerPlexObject class Show(Video, AdvancedSettingsMixin, ArtMixin, BannerMixin, PosterMixin, RatingMixin, SplitMergeMixin, UnmatchMatchMixin, @@ -582,18 +556,20 @@ class Show(Video, AdvancedSettingsMixin, ArtMixin, BannerMixin, PosterMixin, Rat """ Returns list of unwatched :class:`~plexapi.video.Episode` objects. """ return self.episodes(viewCount=0) - def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Download video files to specified directory. + def download(self, savepath=None, keep_original_name=False, subfolders=False, **kwargs): + """ Download all episodes from the show. See :func:`~plexapi.base.Playable.download` for details. Parameters: savepath (str): Defaults to current working dir. - keep_original_name (bool): True to keep the original file name otherwise - a friendlier is generated. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. + subfolders (bool): True to separate episodes in to season folders. **kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`. """ filepaths = [] for episode in self.episodes(): - filepaths += episode.download(savepath, keep_original_name, **kwargs) + _savepath = os.path.join(savepath, 'Season %s' % str(episode.seasonNumber).zfill(2)) if subfolders else savepath + filepaths += episode.download(_savepath, keep_original_name, **kwargs) return filepaths @@ -714,12 +690,12 @@ class Season(Video, ArtMixin, PosterMixin, RatingMixin, CollectionMixin): return self.episodes(viewCount=0) def download(self, savepath=None, keep_original_name=False, **kwargs): - """ Download video files to specified directory. + """ Download all episodes from the season. See :func:`~plexapi.base.Playable.download` for details. Parameters: savepath (str): Defaults to current working dir. - keep_original_name (bool): True to keep the original file name otherwise - a friendlier is generated. + keep_original_name (bool): True to keep the original filename otherwise + a friendlier filename is generated. **kwargs: Additional options passed into :func:`~plexapi.base.PlexObject.getStreamURL`. """ filepaths = [] @@ -839,8 +815,8 @@ class Episode(Video, Playable, ArtMixin, PosterMixin, RatingMixin, ] if p]) def _prettyfilename(self): - """ Returns a human friendly filename. """ - return '%s.%s' % (self.grandparentTitle.replace(' ', '.'), self.seasonEpisode) + """ Returns a filename for use in download. """ + return '%s - %s - %s' % (self.grandparentTitle, self.seasonEpisode, self.title) @property def actors(self): @@ -953,6 +929,7 @@ class Clip(Video, Playable, ArtUrlMixin, PosterUrlMixin): return [part.file for part in self.iterParts() if part] def _prettyfilename(self): + """ Returns a filename for use in download. """ return self.title @@ -968,4 +945,5 @@ class Extra(Clip): self.librarySectionTitle = parent.librarySectionTitle def _prettyfilename(self): + """ Returns a filename for use in download. """ return '%s (%s)' % (self.title, self.subtype)