Update vendored beets to 1.6.0

Updates colorama to 0.4.6
Adds confuse version 1.7.0
Updates jellyfish to 0.9.0
Adds mediafile 0.10.1
Updates munkres to 1.1.4
Updates musicbrainzngs to 0.7.1
Updates mutagen to 1.46.0
Updates pyyaml to 6.0
Updates unidecode to 1.3.6
This commit is contained in:
Labrys of Knossos 2022-11-28 18:02:40 -05:00
commit 56c6773c6b
385 changed files with 25143 additions and 18080 deletions

View file

@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
# This file is part of beets.
# Copyright 2016, Adrian Sampson.
#
@ -15,28 +14,28 @@
"""Miscellaneous utility functions."""
from __future__ import division, absolute_import, print_function
import os
import sys
import errno
import locale
import re
import tempfile
import shutil
import fnmatch
from collections import Counter
import functools
from collections import Counter, namedtuple
from multiprocessing.pool import ThreadPool
import traceback
import subprocess
import platform
import shlex
from beets.util import hidden
import six
from unidecode import unidecode
from enum import Enum
MAX_FILENAME_LENGTH = 200
WINDOWS_MAGIC_PREFIX = u'\\\\?\\'
SNI_SUPPORTED = sys.version_info >= (2, 7, 9)
WINDOWS_MAGIC_PREFIX = '\\\\?\\'
class HumanReadableException(Exception):
@ -58,27 +57,27 @@ class HumanReadableException(Exception):
self.reason = reason
self.verb = verb
self.tb = tb
super(HumanReadableException, self).__init__(self.get_message())
super().__init__(self.get_message())
def _gerund(self):
"""Generate a (likely) gerund form of the English verb.
"""
if u' ' in self.verb:
if ' ' in self.verb:
return self.verb
gerund = self.verb[:-1] if self.verb.endswith(u'e') else self.verb
gerund += u'ing'
gerund = self.verb[:-1] if self.verb.endswith('e') else self.verb
gerund += 'ing'
return gerund
def _reasonstr(self):
"""Get the reason as a string."""
if isinstance(self.reason, six.text_type):
if isinstance(self.reason, str):
return self.reason
elif isinstance(self.reason, bytes):
return self.reason.decode('utf-8', 'ignore')
elif hasattr(self.reason, 'strerror'): # i.e., EnvironmentError
return self.reason.strerror
else:
return u'"{0}"'.format(six.text_type(self.reason))
return '"{}"'.format(str(self.reason))
def get_message(self):
"""Create the human-readable description of the error, sans
@ -92,7 +91,7 @@ class HumanReadableException(Exception):
"""
if self.tb:
logger.debug(self.tb)
logger.error(u'{0}: {1}', self.error_kind, self.args[0])
logger.error('{0}: {1}', self.error_kind, self.args[0])
class FilesystemError(HumanReadableException):
@ -100,29 +99,30 @@ class FilesystemError(HumanReadableException):
via a function in this module. The `paths` field is a sequence of
pathnames involved in the operation.
"""
def __init__(self, reason, verb, paths, tb=None):
self.paths = paths
super(FilesystemError, self).__init__(reason, verb, tb)
super().__init__(reason, verb, tb)
def get_message(self):
# Use a nicer English phrasing for some specific verbs.
if self.verb in ('move', 'copy', 'rename'):
clause = u'while {0} {1} to {2}'.format(
clause = 'while {} {} to {}'.format(
self._gerund(),
displayable_path(self.paths[0]),
displayable_path(self.paths[1])
)
elif self.verb in ('delete', 'write', 'create', 'read'):
clause = u'while {0} {1}'.format(
clause = 'while {} {}'.format(
self._gerund(),
displayable_path(self.paths[0])
)
else:
clause = u'during {0} of paths {1}'.format(
self.verb, u', '.join(displayable_path(p) for p in self.paths)
clause = 'during {} of paths {}'.format(
self.verb, ', '.join(displayable_path(p) for p in self.paths)
)
return u'{0} {1}'.format(self._reasonstr(), clause)
return f'{self._reasonstr()} {clause}'
class MoveOperation(Enum):
@ -132,6 +132,8 @@ class MoveOperation(Enum):
COPY = 1
LINK = 2
HARDLINK = 3
REFLINK = 4
REFLINK_AUTO = 5
def normpath(path):
@ -182,7 +184,7 @@ def sorted_walk(path, ignore=(), ignore_hidden=False, logger=None):
contents = os.listdir(syspath(path))
except OSError as exc:
if logger:
logger.warning(u'could not list directory {0}: {1}'.format(
logger.warning('could not list directory {}: {}'.format(
displayable_path(path), exc.strerror
))
return
@ -195,6 +197,10 @@ def sorted_walk(path, ignore=(), ignore_hidden=False, logger=None):
skip = False
for pat in ignore:
if fnmatch.fnmatch(base, pat):
if logger:
logger.debug('ignoring {} due to ignore rule {}'.format(
base, pat
))
skip = True
break
if skip:
@ -217,8 +223,14 @@ def sorted_walk(path, ignore=(), ignore_hidden=False, logger=None):
for base in dirs:
cur = os.path.join(path, base)
# yield from sorted_walk(...)
for res in sorted_walk(cur, ignore, ignore_hidden, logger):
yield res
yield from sorted_walk(cur, ignore, ignore_hidden, logger)
def path_as_posix(path):
"""Return the string representation of the path with forward (/)
slashes.
"""
return path.replace(b'\\', b'/')
def mkdirall(path):
@ -229,7 +241,7 @@ def mkdirall(path):
if not os.path.isdir(syspath(ancestor)):
try:
os.mkdir(syspath(ancestor))
except (OSError, IOError) as exc:
except OSError as exc:
raise FilesystemError(exc, 'create', (ancestor,),
traceback.format_exc())
@ -282,13 +294,13 @@ def prune_dirs(path, root=None, clutter=('.DS_Store', 'Thumbs.db')):
continue
clutter = [bytestring_path(c) for c in clutter]
match_paths = [bytestring_path(d) for d in os.listdir(directory)]
if fnmatch_all(match_paths, clutter):
# Directory contains only clutter (or nothing).
try:
try:
if fnmatch_all(match_paths, clutter):
# Directory contains only clutter (or nothing).
shutil.rmtree(directory)
except OSError:
else:
break
else:
except OSError:
break
@ -367,18 +379,18 @@ def bytestring_path(path):
PATH_SEP = bytestring_path(os.sep)
def displayable_path(path, separator=u'; '):
def displayable_path(path, separator='; '):
"""Attempts to decode a bytestring path to a unicode object for the
purpose of displaying it to the user. If the `path` argument is a
list or a tuple, the elements are joined with `separator`.
"""
if isinstance(path, (list, tuple)):
return separator.join(displayable_path(p) for p in path)
elif isinstance(path, six.text_type):
elif isinstance(path, str):
return path
elif not isinstance(path, bytes):
# A non-string object: just get its unicode representation.
return six.text_type(path)
return str(path)
try:
return path.decode(_fsencoding(), 'ignore')
@ -397,7 +409,7 @@ def syspath(path, prefix=True):
if os.path.__name__ != 'ntpath':
return path
if not isinstance(path, six.text_type):
if not isinstance(path, str):
# Beets currently represents Windows paths internally with UTF-8
# arbitrarily. But earlier versions used MBCS because it is
# reported as the FS encoding by Windows. Try both.
@ -410,11 +422,11 @@ def syspath(path, prefix=True):
path = path.decode(encoding, 'replace')
# Add the magic prefix if it isn't already there.
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
if prefix and not path.startswith(WINDOWS_MAGIC_PREFIX):
if path.startswith(u'\\\\'):
if path.startswith('\\\\'):
# UNC path. Final path should look like \\?\UNC\...
path = u'UNC' + path[1:]
path = 'UNC' + path[1:]
path = WINDOWS_MAGIC_PREFIX + path
return path
@ -436,7 +448,7 @@ def remove(path, soft=True):
return
try:
os.remove(path)
except (OSError, IOError) as exc:
except OSError as exc:
raise FilesystemError(exc, 'delete', (path,), traceback.format_exc())
@ -451,10 +463,10 @@ def copy(path, dest, replace=False):
path = syspath(path)
dest = syspath(dest)
if not replace and os.path.exists(dest):
raise FilesystemError(u'file exists', 'copy', (path, dest))
raise FilesystemError('file exists', 'copy', (path, dest))
try:
shutil.copyfile(path, dest)
except (OSError, IOError) as exc:
except OSError as exc:
raise FilesystemError(exc, 'copy', (path, dest),
traceback.format_exc())
@ -467,24 +479,37 @@ def move(path, dest, replace=False):
instead, in which case metadata will *not* be preserved. Paths are
translated to system paths.
"""
if os.path.isdir(path):
raise FilesystemError(u'source is directory', 'move', (path, dest))
if os.path.isdir(dest):
raise FilesystemError(u'destination is directory', 'move',
(path, dest))
if samefile(path, dest):
return
path = syspath(path)
dest = syspath(dest)
if os.path.exists(dest) and not replace:
raise FilesystemError(u'file exists', 'rename', (path, dest))
raise FilesystemError('file exists', 'rename', (path, dest))
# First, try renaming the file.
try:
os.rename(path, dest)
os.replace(path, dest)
except OSError:
# Otherwise, copy and delete the original.
tmp = tempfile.mktemp(suffix='.beets',
prefix=py3_path(b'.' + os.path.basename(dest)),
dir=py3_path(os.path.dirname(dest)))
tmp = syspath(tmp)
try:
shutil.copyfile(path, dest)
shutil.copyfile(path, tmp)
os.replace(tmp, dest)
tmp = None
os.remove(path)
except (OSError, IOError) as exc:
except OSError as exc:
raise FilesystemError(exc, 'move', (path, dest),
traceback.format_exc())
finally:
if tmp is not None:
os.remove(tmp)
def link(path, dest, replace=False):
@ -496,18 +521,18 @@ def link(path, dest, replace=False):
return
if os.path.exists(syspath(dest)) and not replace:
raise FilesystemError(u'file exists', 'rename', (path, dest))
raise FilesystemError('file exists', 'rename', (path, dest))
try:
os.symlink(syspath(path), syspath(dest))
except NotImplementedError:
# raised on python >= 3.2 and Windows versions before Vista
raise FilesystemError(u'OS does not support symbolic links.'
raise FilesystemError('OS does not support symbolic links.'
'link', (path, dest), traceback.format_exc())
except OSError as exc:
# TODO: Windows version checks can be removed for python 3
if hasattr('sys', 'getwindowsversion'):
if sys.getwindowsversion()[0] < 6: # is before Vista
exc = u'OS does not support symbolic links.'
exc = 'OS does not support symbolic links.'
raise FilesystemError(exc, 'link', (path, dest),
traceback.format_exc())
@ -521,21 +546,50 @@ def hardlink(path, dest, replace=False):
return
if os.path.exists(syspath(dest)) and not replace:
raise FilesystemError(u'file exists', 'rename', (path, dest))
raise FilesystemError('file exists', 'rename', (path, dest))
try:
os.link(syspath(path), syspath(dest))
except NotImplementedError:
raise FilesystemError(u'OS does not support hard links.'
raise FilesystemError('OS does not support hard links.'
'link', (path, dest), traceback.format_exc())
except OSError as exc:
if exc.errno == errno.EXDEV:
raise FilesystemError(u'Cannot hard link across devices.'
raise FilesystemError('Cannot hard link across devices.'
'link', (path, dest), traceback.format_exc())
else:
raise FilesystemError(exc, 'link', (path, dest),
traceback.format_exc())
def reflink(path, dest, replace=False, fallback=False):
"""Create a reflink from `dest` to `path`.
Raise an `OSError` if `dest` already exists, unless `replace` is
True. If `path` == `dest`, then do nothing.
If reflinking fails and `fallback` is enabled, try copying the file
instead. Otherwise, raise an error without trying a plain copy.
May raise an `ImportError` if the `reflink` module is not available.
"""
import reflink as pyreflink
if samefile(path, dest):
return
if os.path.exists(syspath(dest)) and not replace:
raise FilesystemError('file exists', 'rename', (path, dest))
try:
pyreflink.reflink(path, dest)
except (NotImplementedError, pyreflink.ReflinkImpossibleError):
if fallback:
copy(path, dest, replace)
else:
raise FilesystemError('OS/filesystem does not support reflinks.',
'link', (path, dest), traceback.format_exc())
def unique_path(path):
"""Returns a version of ``path`` that does not exist on the
filesystem. Specifically, if ``path` itself already exists, then
@ -553,22 +607,23 @@ def unique_path(path):
num = 0
while True:
num += 1
suffix = u'.{}'.format(num).encode() + ext
suffix = f'.{num}'.encode() + ext
new_path = base + suffix
if not os.path.exists(new_path):
return new_path
# Note: The Windows "reserved characters" are, of course, allowed on
# Unix. They are forbidden here because they cause problems on Samba
# shares, which are sufficiently common as to cause frequent problems.
# http://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
# https://msdn.microsoft.com/en-us/library/windows/desktop/aa365247.aspx
CHAR_REPLACE = [
(re.compile(r'[\\/]'), u'_'), # / and \ -- forbidden everywhere.
(re.compile(r'^\.'), u'_'), # Leading dot (hidden files on Unix).
(re.compile(r'[\x00-\x1f]'), u''), # Control characters.
(re.compile(r'[<>:"\?\*\|]'), u'_'), # Windows "reserved characters".
(re.compile(r'\.$'), u'_'), # Trailing dots.
(re.compile(r'\s+$'), u''), # Trailing whitespace.
(re.compile(r'[\\/]'), '_'), # / and \ -- forbidden everywhere.
(re.compile(r'^\.'), '_'), # Leading dot (hidden files on Unix).
(re.compile(r'[\x00-\x1f]'), ''), # Control characters.
(re.compile(r'[<>:"\?\*\|]'), '_'), # Windows "reserved characters".
(re.compile(r'\.$'), '_'), # Trailing dots.
(re.compile(r'\s+$'), ''), # Trailing whitespace.
]
@ -692,36 +747,29 @@ def py3_path(path):
it is. So this function helps us "smuggle" the true bytes data
through APIs that took Python 3's Unicode mandate too seriously.
"""
if isinstance(path, six.text_type):
if isinstance(path, str):
return path
assert isinstance(path, bytes)
if six.PY2:
return path
return os.fsdecode(path)
def str2bool(value):
"""Returns a boolean reflecting a human-entered string."""
return value.lower() in (u'yes', u'1', u'true', u't', u'y')
return value.lower() in ('yes', '1', 'true', 't', 'y')
def as_string(value):
"""Convert a value to a Unicode object for matching with a query.
None becomes the empty string. Bytestrings are silently decoded.
"""
if six.PY2:
buffer_types = buffer, memoryview # noqa: F821
else:
buffer_types = memoryview
if value is None:
return u''
elif isinstance(value, buffer_types):
return ''
elif isinstance(value, memoryview):
return bytes(value).decode('utf-8', 'ignore')
elif isinstance(value, bytes):
return value.decode('utf-8', 'ignore')
else:
return six.text_type(value)
return str(value)
def text_string(value, encoding='utf-8'):
@ -744,7 +792,7 @@ def plurality(objs):
"""
c = Counter(objs)
if not c:
raise ValueError(u'sequence must be non-empty')
raise ValueError('sequence must be non-empty')
return c.most_common(1)[0]
@ -761,7 +809,11 @@ def cpu_count():
num = 0
elif sys.platform == 'darwin':
try:
num = int(command_output(['/usr/sbin/sysctl', '-n', 'hw.ncpu']))
num = int(command_output([
'/usr/sbin/sysctl',
'-n',
'hw.ncpu',
]).stdout)
except (ValueError, OSError, subprocess.CalledProcessError):
num = 0
else:
@ -781,20 +833,23 @@ def convert_command_args(args):
assert isinstance(args, list)
def convert(arg):
if six.PY2:
if isinstance(arg, six.text_type):
arg = arg.encode(arg_encoding())
else:
if isinstance(arg, bytes):
arg = arg.decode(arg_encoding(), 'surrogateescape')
if isinstance(arg, bytes):
arg = arg.decode(arg_encoding(), 'surrogateescape')
return arg
return [convert(a) for a in args]
# stdout and stderr as bytes
CommandOutput = namedtuple("CommandOutput", ("stdout", "stderr"))
def command_output(cmd, shell=False):
"""Runs the command and returns its output after it has exited.
Returns a CommandOutput. The attributes ``stdout`` and ``stderr`` contain
byte strings of the respective output streams.
``cmd`` is a list of arguments starting with the command names. The
arguments are bytes on Unix and strings on Windows.
If ``shell`` is true, ``cmd`` is assumed to be a string and passed to a
@ -829,7 +884,7 @@ def command_output(cmd, shell=False):
cmd=' '.join(cmd),
output=stdout + stderr,
)
return stdout
return CommandOutput(stdout, stderr)
def max_filename_length(path, limit=MAX_FILENAME_LENGTH):
@ -876,25 +931,6 @@ def editor_command():
return open_anything()
def shlex_split(s):
"""Split a Unicode or bytes string according to shell lexing rules.
Raise `ValueError` if the string is not a well-formed shell string.
This is a workaround for a bug in some versions of Python.
"""
if not six.PY2 or isinstance(s, bytes): # Shlex works fine.
return shlex.split(s)
elif isinstance(s, six.text_type):
# Work around a Python bug.
# http://bugs.python.org/issue6988
bs = s.encode('utf-8')
return [c.decode('utf-8') for c in shlex.split(bs)]
else:
raise TypeError(u'shlex_split called with non-string')
def interactive_open(targets, command):
"""Open the files in `targets` by `exec`ing a new `command`, given
as a Unicode string. (The new program takes over, and Python
@ -906,7 +942,7 @@ def interactive_open(targets, command):
# Split the command string into its arguments.
try:
args = shlex_split(command)
args = shlex.split(command)
except ValueError: # Malformed shell tokens.
args = [command]
@ -921,7 +957,7 @@ def _windows_long_path_name(short_path):
"""Use Windows' `GetLongPathNameW` via ctypes to get the canonical,
long path given a short filename.
"""
if not isinstance(short_path, six.text_type):
if not isinstance(short_path, str):
short_path = short_path.decode(_fsencoding())
import ctypes
@ -982,7 +1018,7 @@ def raw_seconds_short(string):
"""
match = re.match(r'^(\d+):([0-5]\d)$', string)
if not match:
raise ValueError(u'String not in M:SS format')
raise ValueError('String not in M:SS format')
minutes, seconds = map(int, match.groups())
return float(minutes * 60 + seconds)
@ -1009,3 +1045,59 @@ def asciify_path(path, sep_replace):
sep_replace
)
return os.sep.join(path_components)
def par_map(transform, items):
"""Apply the function `transform` to all the elements in the
iterable `items`, like `map(transform, items)` but with no return
value. The map *might* happen in parallel: it's parallel on Python 3
and sequential on Python 2.
The parallelism uses threads (not processes), so this is only useful
for IO-bound `transform`s.
"""
pool = ThreadPool()
pool.map(transform, items)
pool.close()
pool.join()
def lazy_property(func):
"""A decorator that creates a lazily evaluated property. On first access,
the property is assigned the return value of `func`. This first value is
stored, so that future accesses do not have to evaluate `func` again.
This behaviour is useful when `func` is expensive to evaluate, and it is
not certain that the result will be needed.
"""
field_name = '_' + func.__name__
@property
@functools.wraps(func)
def wrapper(self):
if hasattr(self, field_name):
return getattr(self, field_name)
value = func(self)
setattr(self, field_name, value)
return value
return wrapper
def decode_commandline_path(path):
"""Prepare a path for substitution into commandline template.
On Python 3, we need to construct the subprocess commands to invoke as a
Unicode string. On Unix, this is a little unfortunate---the OS is
expecting bytes---so we use surrogate escaping and decode with the
argument encoding, which is the same encoding that will then be
*reversed* to recover the same bytes before invoking the OS. On
Windows, we want to preserve the Unicode filename "as is."
"""
# On Python 3, the template is a Unicode string, which only supports
# substitution of Unicode variables.
if platform.system() == 'Windows':
return path.decode(_fsencoding())
else:
return path.decode(arg_encoding(), 'surrogateescape')