diff --git a/lib/importlib_metadata/__init__.py b/lib/importlib_metadata/__init__.py new file mode 100644 index 00000000..9a36a8e6 --- /dev/null +++ b/lib/importlib_metadata/__init__.py @@ -0,0 +1,904 @@ +import os +import re +import abc +import csv +import sys +import zipp +import email +import pathlib +import operator +import textwrap +import warnings +import functools +import itertools +import posixpath +import collections + +from . import _adapters, _meta, _py39compat +from ._collections import FreezableDefaultDict, Pair +from ._compat import ( + NullFinder, + install, + pypy_partial, +) +from ._functools import method_cache, pass_none +from ._itertools import always_iterable, unique_everseen +from ._meta import PackageMetadata, SimplePath + +from contextlib import suppress +from importlib import import_module +from importlib.abc import MetaPathFinder +from itertools import starmap +from typing import List, Mapping, Optional + + +__all__ = [ + 'Distribution', + 'DistributionFinder', + 'PackageMetadata', + 'PackageNotFoundError', + 'distribution', + 'distributions', + 'entry_points', + 'files', + 'metadata', + 'packages_distributions', + 'requires', + 'version', +] + + +class PackageNotFoundError(ModuleNotFoundError): + """The package was not found.""" + + def __str__(self): + return f"No package metadata was found for {self.name}" + + @property + def name(self): + (name,) = self.args + return name + + +class Sectioned: + """ + A simple entry point config parser for performance + + >>> for item in Sectioned.read(Sectioned._sample): + ... print(item) + Pair(name='sec1', value='# comments ignored') + Pair(name='sec1', value='a = 1') + Pair(name='sec1', value='b = 2') + Pair(name='sec2', value='a = 2') + + >>> res = Sectioned.section_pairs(Sectioned._sample) + >>> item = next(res) + >>> item.name + 'sec1' + >>> item.value + Pair(name='a', value='1') + >>> item = next(res) + >>> item.value + Pair(name='b', value='2') + >>> item = next(res) + >>> item.name + 'sec2' + >>> item.value + Pair(name='a', value='2') + >>> list(res) + [] + """ + + _sample = textwrap.dedent( + """ + [sec1] + # comments ignored + a = 1 + b = 2 + + [sec2] + a = 2 + """ + ).lstrip() + + @classmethod + def section_pairs(cls, text): + return ( + section._replace(value=Pair.parse(section.value)) + for section in cls.read(text, filter_=cls.valid) + if section.name is not None + ) + + @staticmethod + def read(text, filter_=None): + lines = filter(filter_, map(str.strip, text.splitlines())) + name = None + for value in lines: + section_match = value.startswith('[') and value.endswith(']') + if section_match: + name = value.strip('[]') + continue + yield Pair(name, value) + + @staticmethod + def valid(line): + return line and not line.startswith('#') + + +class DeprecatedTuple: + """ + Provide subscript item access for backward compatibility. + + >>> recwarn = getfixture('recwarn') + >>> ep = EntryPoint(name='name', value='value', group='group') + >>> ep[:] + ('name', 'value', 'group') + >>> ep[0] + 'name' + >>> len(recwarn) + 1 + """ + + # Do not remove prior to 2023-05-01 or Python 3.13 + _warn = functools.partial( + warnings.warn, + "EntryPoint tuple interface is deprecated. Access members by name.", + DeprecationWarning, + stacklevel=pypy_partial(2), + ) + + def __getitem__(self, item): + self._warn() + return self._key()[item] + + +class EntryPoint(DeprecatedTuple): + """An entry point as defined by Python packaging conventions. + + See `the packaging docs on entry points + `_ + for more information. + + >>> ep = EntryPoint( + ... name=None, group=None, value='package.module:attr [extra1, extra2]') + >>> ep.module + 'package.module' + >>> ep.attr + 'attr' + >>> ep.extras + ['extra1', 'extra2'] + """ + + pattern = re.compile( + r'(?P[\w.]+)\s*' + r'(:\s*(?P[\w.]+)\s*)?' + r'((?P\[.*\])\s*)?$' + ) + """ + A regular expression describing the syntax for an entry point, + which might look like: + + - module + - package.module + - package.module:attribute + - package.module:object.attribute + - package.module:attr [extra1, extra2] + + Other combinations are possible as well. + + The expression is lenient about whitespace around the ':', + following the attr, and following any extras. + """ + + name: str + value: str + group: str + + dist: Optional['Distribution'] = None + + def __init__(self, name, value, group): + vars(self).update(name=name, value=value, group=group) + + def load(self): + """Load the entry point from its definition. If only a module + is indicated by the value, return that module. Otherwise, + return the named object. + """ + match = self.pattern.match(self.value) + module = import_module(match.group('module')) + attrs = filter(None, (match.group('attr') or '').split('.')) + return functools.reduce(getattr, attrs, module) + + @property + def module(self): + match = self.pattern.match(self.value) + return match.group('module') + + @property + def attr(self): + match = self.pattern.match(self.value) + return match.group('attr') + + @property + def extras(self): + match = self.pattern.match(self.value) + return re.findall(r'\w+', match.group('extras') or '') + + def _for(self, dist): + vars(self).update(dist=dist) + return self + + def matches(self, **params): + """ + EntryPoint matches the given parameters. + + >>> ep = EntryPoint(group='foo', name='bar', value='bing:bong [extra1, extra2]') + >>> ep.matches(group='foo') + True + >>> ep.matches(name='bar', value='bing:bong [extra1, extra2]') + True + >>> ep.matches(group='foo', name='other') + False + >>> ep.matches() + True + >>> ep.matches(extras=['extra1', 'extra2']) + True + >>> ep.matches(module='bing') + True + >>> ep.matches(attr='bong') + True + """ + attrs = (getattr(self, param) for param in params) + return all(map(operator.eq, params.values(), attrs)) + + def _key(self): + return self.name, self.value, self.group + + def __lt__(self, other): + return self._key() < other._key() + + def __eq__(self, other): + return self._key() == other._key() + + def __setattr__(self, name, value): + raise AttributeError("EntryPoint objects are immutable.") + + def __repr__(self): + return ( + f'EntryPoint(name={self.name!r}, value={self.value!r}, ' + f'group={self.group!r})' + ) + + def __hash__(self): + return hash(self._key()) + + +class EntryPoints(tuple): + """ + An immutable collection of selectable EntryPoint objects. + """ + + __slots__ = () + + def __getitem__(self, name): # -> EntryPoint: + """ + Get the EntryPoint in self matching name. + """ + try: + return next(iter(self.select(name=name))) + except StopIteration: + raise KeyError(name) + + def select(self, **params): + """ + Select entry points from self that match the + given parameters (typically group and/or name). + """ + return EntryPoints(ep for ep in self if _py39compat.ep_matches(ep, **params)) + + @property + def names(self): + """ + Return the set of all names of all entry points. + """ + return {ep.name for ep in self} + + @property + def groups(self): + """ + Return the set of all groups of all entry points. + """ + return {ep.group for ep in self} + + @classmethod + def _from_text_for(cls, text, dist): + return cls(ep._for(dist) for ep in cls._from_text(text)) + + @staticmethod + def _from_text(text): + return ( + EntryPoint(name=item.value.name, value=item.value.value, group=item.name) + for item in Sectioned.section_pairs(text or '') + ) + + +class PackagePath(pathlib.PurePosixPath): + """A reference to a path in a package""" + + def read_text(self, encoding='utf-8'): + with self.locate().open(encoding=encoding) as stream: + return stream.read() + + def read_binary(self): + with self.locate().open('rb') as stream: + return stream.read() + + def locate(self): + """Return a path-like object for this path""" + return self.dist.locate_file(self) + + +class FileHash: + def __init__(self, spec): + self.mode, _, self.value = spec.partition('=') + + def __repr__(self): + return f'' + + +class Distribution(metaclass=abc.ABCMeta): + """A Python distribution package.""" + + @abc.abstractmethod + def read_text(self, filename): + """Attempt to load metadata file given by the name. + + :param filename: The name of the file in the distribution info. + :return: The text if found, otherwise None. + """ + + @abc.abstractmethod + def locate_file(self, path): + """ + Given a path to a file in this distribution, return a path + to it. + """ + + @classmethod + def from_name(cls, name: str): + """Return the Distribution for the given package name. + + :param name: The name of the distribution package to search for. + :return: The Distribution instance (or subclass thereof) for the named + package, if found. + :raises PackageNotFoundError: When the named package's distribution + metadata cannot be found. + :raises ValueError: When an invalid value is supplied for name. + """ + if not name: + raise ValueError("A distribution name is required.") + try: + return next(cls.discover(name=name)) + except StopIteration: + raise PackageNotFoundError(name) + + @classmethod + def discover(cls, **kwargs): + """Return an iterable of Distribution objects for all packages. + + Pass a ``context`` or pass keyword arguments for constructing + a context. + + :context: A ``DistributionFinder.Context`` object. + :return: Iterable of Distribution objects for all packages. + """ + context = kwargs.pop('context', None) + if context and kwargs: + raise ValueError("cannot accept context and kwargs") + context = context or DistributionFinder.Context(**kwargs) + return itertools.chain.from_iterable( + resolver(context) for resolver in cls._discover_resolvers() + ) + + @staticmethod + def at(path): + """Return a Distribution for the indicated metadata path + + :param path: a string or path-like object + :return: a concrete Distribution instance for the path + """ + return PathDistribution(pathlib.Path(path)) + + @staticmethod + def _discover_resolvers(): + """Search the meta_path for resolvers.""" + declared = ( + getattr(finder, 'find_distributions', None) for finder in sys.meta_path + ) + return filter(None, declared) + + @property + def metadata(self) -> _meta.PackageMetadata: + """Return the parsed metadata for this Distribution. + + The returned object will have keys that name the various bits of + metadata. See PEP 566 for details. + """ + text = ( + self.read_text('METADATA') + or self.read_text('PKG-INFO') + # This last clause is here to support old egg-info files. Its + # effect is to just end up using the PathDistribution's self._path + # (which points to the egg-info file) attribute unchanged. + or self.read_text('') + ) + return _adapters.Message(email.message_from_string(text)) + + @property + def name(self): + """Return the 'Name' metadata for the distribution package.""" + return self.metadata['Name'] + + @property + def _normalized_name(self): + """Return a normalized version of the name.""" + return Prepared.normalize(self.name) + + @property + def version(self): + """Return the 'Version' metadata for the distribution package.""" + return self.metadata['Version'] + + @property + def entry_points(self): + return EntryPoints._from_text_for(self.read_text('entry_points.txt'), self) + + @property + def files(self): + """Files in this distribution. + + :return: List of PackagePath for this distribution or None + + Result is `None` if the metadata file that enumerates files + (i.e. RECORD for dist-info or SOURCES.txt for egg-info) is + missing. + Result may be empty if the metadata exists but is empty. + """ + + def make_file(name, hash=None, size_str=None): + result = PackagePath(name) + result.hash = FileHash(hash) if hash else None + result.size = int(size_str) if size_str else None + result.dist = self + return result + + @pass_none + def make_files(lines): + return list(starmap(make_file, csv.reader(lines))) + + return make_files(self._read_files_distinfo() or self._read_files_egginfo()) + + def _read_files_distinfo(self): + """ + Read the lines of RECORD + """ + text = self.read_text('RECORD') + return text and text.splitlines() + + def _read_files_egginfo(self): + """ + SOURCES.txt might contain literal commas, so wrap each line + in quotes. + """ + text = self.read_text('SOURCES.txt') + return text and map('"{}"'.format, text.splitlines()) + + @property + def requires(self): + """Generated requirements specified for this Distribution""" + reqs = self._read_dist_info_reqs() or self._read_egg_info_reqs() + return reqs and list(reqs) + + def _read_dist_info_reqs(self): + return self.metadata.get_all('Requires-Dist') + + def _read_egg_info_reqs(self): + source = self.read_text('requires.txt') + return pass_none(self._deps_from_requires_text)(source) + + @classmethod + def _deps_from_requires_text(cls, source): + return cls._convert_egg_info_reqs_to_simple_reqs(Sectioned.read(source)) + + @staticmethod + def _convert_egg_info_reqs_to_simple_reqs(sections): + """ + Historically, setuptools would solicit and store 'extra' + requirements, including those with environment markers, + in separate sections. More modern tools expect each + dependency to be defined separately, with any relevant + extras and environment markers attached directly to that + requirement. This method converts the former to the + latter. See _test_deps_from_requires_text for an example. + """ + + def make_condition(name): + return name and f'extra == "{name}"' + + def quoted_marker(section): + section = section or '' + extra, sep, markers = section.partition(':') + if extra and markers: + markers = f'({markers})' + conditions = list(filter(None, [markers, make_condition(extra)])) + return '; ' + ' and '.join(conditions) if conditions else '' + + def url_req_space(req): + """ + PEP 508 requires a space between the url_spec and the quoted_marker. + Ref python/importlib_metadata#357. + """ + # '@' is uniquely indicative of a url_req. + return ' ' * ('@' in req) + + for section in sections: + space = url_req_space(section.value) + yield section.value + space + quoted_marker(section.name) + + +class DistributionFinder(MetaPathFinder): + """ + A MetaPathFinder capable of discovering installed distributions. + """ + + class Context: + """ + Keyword arguments presented by the caller to + ``distributions()`` or ``Distribution.discover()`` + to narrow the scope of a search for distributions + in all DistributionFinders. + + Each DistributionFinder may expect any parameters + and should attempt to honor the canonical + parameters defined below when appropriate. + """ + + name = None + """ + Specific name for which a distribution finder should match. + A name of ``None`` matches all distributions. + """ + + def __init__(self, **kwargs): + vars(self).update(kwargs) + + @property + def path(self): + """ + The sequence of directory path that a distribution finder + should search. + + Typically refers to Python installed package paths such as + "site-packages" directories and defaults to ``sys.path``. + """ + return vars(self).get('path', sys.path) + + @abc.abstractmethod + def find_distributions(self, context=Context()): + """ + Find distributions. + + Return an iterable of all Distribution instances capable of + loading the metadata for packages matching the ``context``, + a DistributionFinder.Context instance. + """ + + +class FastPath: + """ + Micro-optimized class for searching a path for + children. + + >>> FastPath('').children() + ['...'] + """ + + @functools.lru_cache() # type: ignore + def __new__(cls, root): + return super().__new__(cls) + + def __init__(self, root): + self.root = root + + def joinpath(self, child): + return pathlib.Path(self.root, child) + + def children(self): + with suppress(Exception): + return os.listdir(self.root or '.') + with suppress(Exception): + return self.zip_children() + return [] + + def zip_children(self): + zip_path = zipp.Path(self.root) + names = zip_path.root.namelist() + self.joinpath = zip_path.joinpath + + return dict.fromkeys(child.split(posixpath.sep, 1)[0] for child in names) + + def search(self, name): + return self.lookup(self.mtime).search(name) + + @property + def mtime(self): + with suppress(OSError): + return os.stat(self.root).st_mtime + self.lookup.cache_clear() + + @method_cache + def lookup(self, mtime): + return Lookup(self) + + +class Lookup: + def __init__(self, path: FastPath): + base = os.path.basename(path.root).lower() + base_is_egg = base.endswith(".egg") + self.infos = FreezableDefaultDict(list) + self.eggs = FreezableDefaultDict(list) + + for child in path.children(): + low = child.lower() + if low.endswith((".dist-info", ".egg-info")): + # rpartition is faster than splitext and suitable for this purpose. + name = low.rpartition(".")[0].partition("-")[0] + normalized = Prepared.normalize(name) + self.infos[normalized].append(path.joinpath(child)) + elif base_is_egg and low == "egg-info": + name = base.rpartition(".")[0].partition("-")[0] + legacy_normalized = Prepared.legacy_normalize(name) + self.eggs[legacy_normalized].append(path.joinpath(child)) + + self.infos.freeze() + self.eggs.freeze() + + def search(self, prepared): + infos = ( + self.infos[prepared.normalized] + if prepared + else itertools.chain.from_iterable(self.infos.values()) + ) + eggs = ( + self.eggs[prepared.legacy_normalized] + if prepared + else itertools.chain.from_iterable(self.eggs.values()) + ) + return itertools.chain(infos, eggs) + + +class Prepared: + """ + A prepared search for metadata on a possibly-named package. + """ + + normalized = None + legacy_normalized = None + + def __init__(self, name): + self.name = name + if name is None: + return + self.normalized = self.normalize(name) + self.legacy_normalized = self.legacy_normalize(name) + + @staticmethod + def normalize(name): + """ + PEP 503 normalization plus dashes as underscores. + """ + return re.sub(r"[-_.]+", "-", name).lower().replace('-', '_') + + @staticmethod + def legacy_normalize(name): + """ + Normalize the package name as found in the convention in + older packaging tools versions and specs. + """ + return name.lower().replace('-', '_') + + def __bool__(self): + return bool(self.name) + + +@install +class MetadataPathFinder(NullFinder, DistributionFinder): + """A degenerate finder for distribution packages on the file system. + + This finder supplies only a find_distributions() method for versions + of Python that do not have a PathFinder find_distributions(). + """ + + def find_distributions(self, context=DistributionFinder.Context()): + """ + Find distributions. + + Return an iterable of all Distribution instances capable of + loading the metadata for packages matching ``context.name`` + (or all names if ``None`` indicated) along the paths in the list + of directories ``context.path``. + """ + found = self._search_paths(context.name, context.path) + return map(PathDistribution, found) + + @classmethod + def _search_paths(cls, name, paths): + """Find metadata directories in paths heuristically.""" + prepared = Prepared(name) + return itertools.chain.from_iterable( + path.search(prepared) for path in map(FastPath, paths) + ) + + def invalidate_caches(cls): + FastPath.__new__.cache_clear() + + +class PathDistribution(Distribution): + def __init__(self, path: SimplePath): + """Construct a distribution. + + :param path: SimplePath indicating the metadata directory. + """ + self._path = path + + def read_text(self, filename): + with suppress( + FileNotFoundError, + IsADirectoryError, + KeyError, + NotADirectoryError, + PermissionError, + ): + return self._path.joinpath(filename).read_text(encoding='utf-8') + + read_text.__doc__ = Distribution.read_text.__doc__ + + def locate_file(self, path): + return self._path.parent / path + + @property + def _normalized_name(self): + """ + Performance optimization: where possible, resolve the + normalized name from the file system path. + """ + stem = os.path.basename(str(self._path)) + return ( + pass_none(Prepared.normalize)(self._name_from_stem(stem)) + or super()._normalized_name + ) + + @staticmethod + def _name_from_stem(stem): + """ + >>> PathDistribution._name_from_stem('foo-3.0.egg-info') + 'foo' + >>> PathDistribution._name_from_stem('CherryPy-3.0.dist-info') + 'CherryPy' + >>> PathDistribution._name_from_stem('face.egg-info') + 'face' + >>> PathDistribution._name_from_stem('foo.bar') + """ + filename, ext = os.path.splitext(stem) + if ext not in ('.dist-info', '.egg-info'): + return + name, sep, rest = filename.partition('-') + return name + + +def distribution(distribution_name): + """Get the ``Distribution`` instance for the named package. + + :param distribution_name: The name of the distribution package as a string. + :return: A ``Distribution`` instance (or subclass thereof). + """ + return Distribution.from_name(distribution_name) + + +def distributions(**kwargs): + """Get all ``Distribution`` instances in the current environment. + + :return: An iterable of ``Distribution`` instances. + """ + return Distribution.discover(**kwargs) + + +def metadata(distribution_name) -> _meta.PackageMetadata: + """Get the metadata for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: A PackageMetadata containing the parsed metadata. + """ + return Distribution.from_name(distribution_name).metadata + + +def version(distribution_name): + """Get the version string for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: The version string for the package as defined in the package's + "Version" metadata key. + """ + return distribution(distribution_name).version + + +_unique = functools.partial( + unique_everseen, + key=_py39compat.normalized_name, +) +""" +Wrapper for ``distributions`` to return unique distributions by name. +""" + + +def entry_points(**params) -> EntryPoints: + """Return EntryPoint objects for all installed packages. + + Pass selection parameters (group or name) to filter the + result to entry points matching those properties (see + EntryPoints.select()). + + :return: EntryPoints for all installed packages. + """ + eps = itertools.chain.from_iterable( + dist.entry_points for dist in _unique(distributions()) + ) + return EntryPoints(eps).select(**params) + + +def files(distribution_name): + """Return a list of files for the named package. + + :param distribution_name: The name of the distribution package to query. + :return: List of files composing the distribution. + """ + return distribution(distribution_name).files + + +def requires(distribution_name): + """ + Return a list of requirements for the named package. + + :return: An iterator of requirements, suitable for + packaging.requirement.Requirement. + """ + return distribution(distribution_name).requires + + +def packages_distributions() -> Mapping[str, List[str]]: + """ + Return a mapping of top-level packages to their + distributions. + + >>> import collections.abc + >>> pkgs = packages_distributions() + >>> all(isinstance(dist, collections.abc.Sequence) for dist in pkgs.values()) + True + """ + pkg_to_dist = collections.defaultdict(list) + for dist in distributions(): + for pkg in _top_level_declared(dist) or _top_level_inferred(dist): + pkg_to_dist[pkg].append(dist.metadata['Name']) + return dict(pkg_to_dist) + + +def _top_level_declared(dist): + return (dist.read_text('top_level.txt') or '').split() + + +def _top_level_inferred(dist): + return { + f.parts[0] if len(f.parts) > 1 else f.with_suffix('').name + for f in always_iterable(dist.files) + if f.suffix == ".py" + } diff --git a/lib/importlib_metadata/_adapters.py b/lib/importlib_metadata/_adapters.py new file mode 100644 index 00000000..e33cba5e --- /dev/null +++ b/lib/importlib_metadata/_adapters.py @@ -0,0 +1,90 @@ +import functools +import warnings +import re +import textwrap +import email.message + +from ._text import FoldedCase +from ._compat import pypy_partial + + +# Do not remove prior to 2024-01-01 or Python 3.14 +_warn = functools.partial( + warnings.warn, + "Implicit None on return values is deprecated and will raise KeyErrors.", + DeprecationWarning, + stacklevel=pypy_partial(2), +) + + +class Message(email.message.Message): + multiple_use_keys = set( + map( + FoldedCase, + [ + 'Classifier', + 'Obsoletes-Dist', + 'Platform', + 'Project-URL', + 'Provides-Dist', + 'Provides-Extra', + 'Requires-Dist', + 'Requires-External', + 'Supported-Platform', + 'Dynamic', + ], + ) + ) + """ + Keys that may be indicated multiple times per PEP 566. + """ + + def __new__(cls, orig: email.message.Message): + res = super().__new__(cls) + vars(res).update(vars(orig)) + return res + + def __init__(self, *args, **kwargs): + self._headers = self._repair_headers() + + # suppress spurious error from mypy + def __iter__(self): + return super().__iter__() + + def __getitem__(self, item): + """ + Warn users that a ``KeyError`` can be expected when a + mising key is supplied. Ref python/importlib_metadata#371. + """ + res = super().__getitem__(item) + if res is None: + _warn() + return res + + def _repair_headers(self): + def redent(value): + "Correct for RFC822 indentation" + if not value or '\n' not in value: + return value + return textwrap.dedent(' ' * 8 + value) + + headers = [(key, redent(value)) for key, value in vars(self)['_headers']] + if self._payload: + headers.append(('Description', self.get_payload())) + return headers + + @property + def json(self): + """ + Convert PackageMetadata to a JSON-compatible format + per PEP 0566. + """ + + def transform(key): + value = self.get_all(key) if key in self.multiple_use_keys else self[key] + if key == 'Keywords': + value = re.split(r'\s+', value) + tk = key.lower().replace('-', '_') + return tk, value + + return dict(map(transform, map(FoldedCase, self))) diff --git a/lib/importlib_metadata/_collections.py b/lib/importlib_metadata/_collections.py new file mode 100644 index 00000000..cf0954e1 --- /dev/null +++ b/lib/importlib_metadata/_collections.py @@ -0,0 +1,30 @@ +import collections + + +# from jaraco.collections 3.3 +class FreezableDefaultDict(collections.defaultdict): + """ + Often it is desirable to prevent the mutation of + a default dict after its initial construction, such + as to prevent mutation during iteration. + + >>> dd = FreezableDefaultDict(list) + >>> dd[0].append('1') + >>> dd.freeze() + >>> dd[1] + [] + >>> len(dd) + 1 + """ + + def __missing__(self, key): + return getattr(self, '_frozen', super().__missing__)(key) + + def freeze(self): + self._frozen = lambda key: self.default_factory() + + +class Pair(collections.namedtuple('Pair', 'name value')): + @classmethod + def parse(cls, text): + return cls(*map(str.strip, text.split("=", 1))) diff --git a/lib/importlib_metadata/_compat.py b/lib/importlib_metadata/_compat.py new file mode 100644 index 00000000..3d78566e --- /dev/null +++ b/lib/importlib_metadata/_compat.py @@ -0,0 +1,72 @@ +import sys +import platform + + +__all__ = ['install', 'NullFinder', 'Protocol'] + + +try: + from typing import Protocol +except ImportError: # pragma: no cover + # Python 3.7 compatibility + from typing_extensions import Protocol # type: ignore + + +def install(cls): + """ + Class decorator for installation on sys.meta_path. + + Adds the backport DistributionFinder to sys.meta_path and + attempts to disable the finder functionality of the stdlib + DistributionFinder. + """ + sys.meta_path.append(cls()) + disable_stdlib_finder() + return cls + + +def disable_stdlib_finder(): + """ + Give the backport primacy for discovering path-based distributions + by monkey-patching the stdlib O_O. + + See #91 for more background for rationale on this sketchy + behavior. + """ + + def matches(finder): + return getattr( + finder, '__module__', None + ) == '_frozen_importlib_external' and hasattr(finder, 'find_distributions') + + for finder in filter(matches, sys.meta_path): # pragma: nocover + del finder.find_distributions + + +class NullFinder: + """ + A "Finder" (aka "MetaClassFinder") that never finds any modules, + but may find distributions. + """ + + @staticmethod + def find_spec(*args, **kwargs): + return None + + # In Python 2, the import system requires finders + # to have a find_module() method, but this usage + # is deprecated in Python 3 in favor of find_spec(). + # For the purposes of this finder (i.e. being present + # on sys.meta_path but having no other import + # system functionality), the two methods are identical. + find_module = find_spec + + +def pypy_partial(val): + """ + Adjust for variable stacklevel on partial under PyPy. + + Workaround for #327. + """ + is_pypy = platform.python_implementation() == 'PyPy' + return val + is_pypy diff --git a/lib/importlib_metadata/_functools.py b/lib/importlib_metadata/_functools.py new file mode 100644 index 00000000..71f66bd0 --- /dev/null +++ b/lib/importlib_metadata/_functools.py @@ -0,0 +1,104 @@ +import types +import functools + + +# from jaraco.functools 3.3 +def method_cache(method, cache_wrapper=None): + """ + Wrap lru_cache to support storing the cache data in the object instances. + + Abstracts the common paradigm where the method explicitly saves an + underscore-prefixed protected property on first call and returns that + subsequently. + + >>> class MyClass: + ... calls = 0 + ... + ... @method_cache + ... def method(self, value): + ... self.calls += 1 + ... return value + + >>> a = MyClass() + >>> a.method(3) + 3 + >>> for x in range(75): + ... res = a.method(x) + >>> a.calls + 75 + + Note that the apparent behavior will be exactly like that of lru_cache + except that the cache is stored on each instance, so values in one + instance will not flush values from another, and when an instance is + deleted, so are the cached values for that instance. + + >>> b = MyClass() + >>> for x in range(35): + ... res = b.method(x) + >>> b.calls + 35 + >>> a.method(0) + 0 + >>> a.calls + 75 + + Note that if method had been decorated with ``functools.lru_cache()``, + a.calls would have been 76 (due to the cached value of 0 having been + flushed by the 'b' instance). + + Clear the cache with ``.cache_clear()`` + + >>> a.method.cache_clear() + + Same for a method that hasn't yet been called. + + >>> c = MyClass() + >>> c.method.cache_clear() + + Another cache wrapper may be supplied: + + >>> cache = functools.lru_cache(maxsize=2) + >>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache) + >>> a = MyClass() + >>> a.method2() + 3 + + Caution - do not subsequently wrap the method with another decorator, such + as ``@property``, which changes the semantics of the function. + + See also + http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/ + for another implementation and additional justification. + """ + cache_wrapper = cache_wrapper or functools.lru_cache() + + def wrapper(self, *args, **kwargs): + # it's the first call, replace the method with a cached, bound method + bound_method = types.MethodType(method, self) + cached_method = cache_wrapper(bound_method) + setattr(self, method.__name__, cached_method) + return cached_method(*args, **kwargs) + + # Support cache clear even before cache has been created. + wrapper.cache_clear = lambda: None + + return wrapper + + +# From jaraco.functools 3.3 +def pass_none(func): + """ + Wrap func so it's not called if its first param is None + + >>> print_text = pass_none(print) + >>> print_text('text') + text + >>> print_text(None) + """ + + @functools.wraps(func) + def wrapper(param, *args, **kwargs): + if param is not None: + return func(param, *args, **kwargs) + + return wrapper diff --git a/lib/importlib_metadata/_itertools.py b/lib/importlib_metadata/_itertools.py new file mode 100644 index 00000000..d4ca9b91 --- /dev/null +++ b/lib/importlib_metadata/_itertools.py @@ -0,0 +1,73 @@ +from itertools import filterfalse + + +def unique_everseen(iterable, key=None): + "List unique elements, preserving order. Remember all elements ever seen." + # unique_everseen('AAAABBBCCDAABBB') --> A B C D + # unique_everseen('ABBCcAD', str.lower) --> A B C D + seen = set() + seen_add = seen.add + if key is None: + for element in filterfalse(seen.__contains__, iterable): + seen_add(element) + yield element + else: + for element in iterable: + k = key(element) + if k not in seen: + seen_add(k) + yield element + + +# copied from more_itertools 8.8 +def always_iterable(obj, base_type=(str, bytes)): + """If *obj* is iterable, return an iterator over its items:: + + >>> obj = (1, 2, 3) + >>> list(always_iterable(obj)) + [1, 2, 3] + + If *obj* is not iterable, return a one-item iterable containing *obj*:: + + >>> obj = 1 + >>> list(always_iterable(obj)) + [1] + + If *obj* is ``None``, return an empty iterable: + + >>> obj = None + >>> list(always_iterable(None)) + [] + + By default, binary and text strings are not considered iterable:: + + >>> obj = 'foo' + >>> list(always_iterable(obj)) + ['foo'] + + If *base_type* is set, objects for which ``isinstance(obj, base_type)`` + returns ``True`` won't be considered iterable. + + >>> obj = {'a': 1} + >>> list(always_iterable(obj)) # Iterate over the dict's keys + ['a'] + >>> list(always_iterable(obj, base_type=dict)) # Treat dicts as a unit + [{'a': 1}] + + Set *base_type* to ``None`` to avoid any special handling and treat objects + Python considers iterable as iterable: + + >>> obj = 'foo' + >>> list(always_iterable(obj, base_type=None)) + ['f', 'o', 'o'] + """ + if obj is None: + return iter(()) + + if (base_type is not None) and isinstance(obj, base_type): + return iter((obj,)) + + try: + return iter(obj) + except TypeError: + return iter((obj,)) diff --git a/lib/importlib_metadata/_meta.py b/lib/importlib_metadata/_meta.py new file mode 100644 index 00000000..259b15ba --- /dev/null +++ b/lib/importlib_metadata/_meta.py @@ -0,0 +1,49 @@ +from ._compat import Protocol +from typing import Any, Dict, Iterator, List, TypeVar, Union + + +_T = TypeVar("_T") + + +class PackageMetadata(Protocol): + def __len__(self) -> int: + ... # pragma: no cover + + def __contains__(self, item: str) -> bool: + ... # pragma: no cover + + def __getitem__(self, key: str) -> str: + ... # pragma: no cover + + def __iter__(self) -> Iterator[str]: + ... # pragma: no cover + + def get_all(self, name: str, failobj: _T = ...) -> Union[List[Any], _T]: + """ + Return all values associated with a possibly multi-valued key. + """ + + @property + def json(self) -> Dict[str, Union[str, List[str]]]: + """ + A JSON-compatible form of the metadata. + """ + + +class SimplePath(Protocol[_T]): + """ + A minimal subset of pathlib.Path required by PathDistribution. + """ + + def joinpath(self) -> _T: + ... # pragma: no cover + + def __truediv__(self, other: Union[str, _T]) -> _T: + ... # pragma: no cover + + @property + def parent(self) -> _T: + ... # pragma: no cover + + def read_text(self) -> str: + ... # pragma: no cover diff --git a/lib/importlib_metadata/_py39compat.py b/lib/importlib_metadata/_py39compat.py new file mode 100644 index 00000000..cde4558f --- /dev/null +++ b/lib/importlib_metadata/_py39compat.py @@ -0,0 +1,35 @@ +""" +Compatibility layer with Python 3.8/3.9 +""" +from typing import TYPE_CHECKING, Any, Optional + +if TYPE_CHECKING: # pragma: no cover + # Prevent circular imports on runtime. + from . import Distribution, EntryPoint +else: + Distribution = EntryPoint = Any + + +def normalized_name(dist: Distribution) -> Optional[str]: + """ + Honor name normalization for distributions that don't provide ``_normalized_name``. + """ + try: + return dist._normalized_name + except AttributeError: + from . import Prepared # -> delay to prevent circular imports. + + return Prepared.normalize(getattr(dist, "name", None) or dist.metadata['Name']) + + +def ep_matches(ep: EntryPoint, **params) -> bool: + """ + Workaround for ``EntryPoint`` objects without the ``matches`` method. + """ + try: + return ep.matches(**params) + except AttributeError: + from . import EntryPoint # -> delay to prevent circular imports. + + # Reconstruct the EntryPoint object to make sure it is compatible. + return EntryPoint(ep.name, ep.value, ep.group).matches(**params) diff --git a/lib/importlib_metadata/_text.py b/lib/importlib_metadata/_text.py new file mode 100644 index 00000000..c88cfbb2 --- /dev/null +++ b/lib/importlib_metadata/_text.py @@ -0,0 +1,99 @@ +import re + +from ._functools import method_cache + + +# from jaraco.text 3.5 +class FoldedCase(str): + """ + A case insensitive string class; behaves just like str + except compares equal when the only variation is case. + + >>> s = FoldedCase('hello world') + + >>> s == 'Hello World' + True + + >>> 'Hello World' == s + True + + >>> s != 'Hello World' + False + + >>> s.index('O') + 4 + + >>> s.split('O') + ['hell', ' w', 'rld'] + + >>> sorted(map(FoldedCase, ['GAMMA', 'alpha', 'Beta'])) + ['alpha', 'Beta', 'GAMMA'] + + Sequence membership is straightforward. + + >>> "Hello World" in [s] + True + >>> s in ["Hello World"] + True + + You may test for set inclusion, but candidate and elements + must both be folded. + + >>> FoldedCase("Hello World") in {s} + True + >>> s in {FoldedCase("Hello World")} + True + + String inclusion works as long as the FoldedCase object + is on the right. + + >>> "hello" in FoldedCase("Hello World") + True + + But not if the FoldedCase object is on the left: + + >>> FoldedCase('hello') in 'Hello World' + False + + In that case, use in_: + + >>> FoldedCase('hello').in_('Hello World') + True + + >>> FoldedCase('hello') > FoldedCase('Hello') + False + """ + + def __lt__(self, other): + return self.lower() < other.lower() + + def __gt__(self, other): + return self.lower() > other.lower() + + def __eq__(self, other): + return self.lower() == other.lower() + + def __ne__(self, other): + return self.lower() != other.lower() + + def __hash__(self): + return hash(self.lower()) + + def __contains__(self, other): + return super().lower().__contains__(other.lower()) + + def in_(self, other): + "Does self appear in other?" + return self in FoldedCase(other) + + # cache lower since it's likely to be called frequently. + @method_cache + def lower(self): + return super().lower() + + def index(self, sub): + return self.lower().index(sub.lower()) + + def split(self, splitter=' ', maxsplit=0): + pattern = re.compile(re.escape(splitter), re.I) + return pattern.split(self, maxsplit) diff --git a/lib/importlib_metadata/py.typed b/lib/importlib_metadata/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/__init__.py b/lib/importlib_resources/__init__.py new file mode 100644 index 00000000..e6b60c18 --- /dev/null +++ b/lib/importlib_resources/__init__.py @@ -0,0 +1,17 @@ +"""Read resources contained within a package.""" + +from ._common import ( + as_file, + files, + Package, +) + +from .abc import ResourceReader + + +__all__ = [ + 'Package', + 'ResourceReader', + 'as_file', + 'files', +] diff --git a/lib/importlib_resources/_adapters.py b/lib/importlib_resources/_adapters.py new file mode 100644 index 00000000..50688fbb --- /dev/null +++ b/lib/importlib_resources/_adapters.py @@ -0,0 +1,168 @@ +from contextlib import suppress +from io import TextIOWrapper + +from . import abc + + +class SpecLoaderAdapter: + """ + Adapt a package spec to adapt the underlying loader. + """ + + def __init__(self, spec, adapter=lambda spec: spec.loader): + self.spec = spec + self.loader = adapter(spec) + + def __getattr__(self, name): + return getattr(self.spec, name) + + +class TraversableResourcesLoader: + """ + Adapt a loader to provide TraversableResources. + """ + + def __init__(self, spec): + self.spec = spec + + def get_resource_reader(self, name): + return CompatibilityFiles(self.spec)._native() + + +def _io_wrapper(file, mode='r', *args, **kwargs): + if mode == 'r': + return TextIOWrapper(file, *args, **kwargs) + elif mode == 'rb': + return file + raise ValueError(f"Invalid mode value '{mode}', only 'r' and 'rb' are supported") + + +class CompatibilityFiles: + """ + Adapter for an existing or non-existent resource reader + to provide a compatibility .files(). + """ + + class SpecPath(abc.Traversable): + """ + Path tied to a module spec. + Can be read and exposes the resource reader children. + """ + + def __init__(self, spec, reader): + self._spec = spec + self._reader = reader + + def iterdir(self): + if not self._reader: + return iter(()) + return iter( + CompatibilityFiles.ChildPath(self._reader, path) + for path in self._reader.contents() + ) + + def is_file(self): + return False + + is_dir = is_file + + def joinpath(self, other): + if not self._reader: + return CompatibilityFiles.OrphanPath(other) + return CompatibilityFiles.ChildPath(self._reader, other) + + @property + def name(self): + return self._spec.name + + def open(self, mode='r', *args, **kwargs): + return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs) + + class ChildPath(abc.Traversable): + """ + Path tied to a resource reader child. + Can be read but doesn't expose any meaningful children. + """ + + def __init__(self, reader, name): + self._reader = reader + self._name = name + + def iterdir(self): + return iter(()) + + def is_file(self): + return self._reader.is_resource(self.name) + + def is_dir(self): + return not self.is_file() + + def joinpath(self, other): + return CompatibilityFiles.OrphanPath(self.name, other) + + @property + def name(self): + return self._name + + def open(self, mode='r', *args, **kwargs): + return _io_wrapper( + self._reader.open_resource(self.name), mode, *args, **kwargs + ) + + class OrphanPath(abc.Traversable): + """ + Orphan path, not tied to a module spec or resource reader. + Can't be read and doesn't expose any meaningful children. + """ + + def __init__(self, *path_parts): + if len(path_parts) < 1: + raise ValueError('Need at least one path part to construct a path') + self._path = path_parts + + def iterdir(self): + return iter(()) + + def is_file(self): + return False + + is_dir = is_file + + def joinpath(self, other): + return CompatibilityFiles.OrphanPath(*self._path, other) + + @property + def name(self): + return self._path[-1] + + def open(self, mode='r', *args, **kwargs): + raise FileNotFoundError("Can't open orphan path") + + def __init__(self, spec): + self.spec = spec + + @property + def _reader(self): + with suppress(AttributeError): + return self.spec.loader.get_resource_reader(self.spec.name) + + def _native(self): + """ + Return the native reader if it supports files(). + """ + reader = self._reader + return reader if hasattr(reader, 'files') else self + + def __getattr__(self, attr): + return getattr(self._reader, attr) + + def files(self): + return CompatibilityFiles.SpecPath(self.spec, self._reader) + + +def wrap_spec(package): + """ + Construct a package spec with traversable compatibility + on the spec/loader/reader. + """ + return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader) diff --git a/lib/importlib_resources/_common.py b/lib/importlib_resources/_common.py new file mode 100644 index 00000000..3c6de1cf --- /dev/null +++ b/lib/importlib_resources/_common.py @@ -0,0 +1,207 @@ +import os +import pathlib +import tempfile +import functools +import contextlib +import types +import importlib +import inspect +import warnings +import itertools + +from typing import Union, Optional, cast +from .abc import ResourceReader, Traversable + +from ._compat import wrap_spec + +Package = Union[types.ModuleType, str] +Anchor = Package + + +def package_to_anchor(func): + """ + Replace 'package' parameter as 'anchor' and warn about the change. + + Other errors should fall through. + + >>> files('a', 'b') + Traceback (most recent call last): + TypeError: files() takes from 0 to 1 positional arguments but 2 were given + """ + undefined = object() + + @functools.wraps(func) + def wrapper(anchor=undefined, package=undefined): + if package is not undefined: + if anchor is not undefined: + return func(anchor, package) + warnings.warn( + "First parameter to files is renamed to 'anchor'", + DeprecationWarning, + stacklevel=2, + ) + return func(package) + elif anchor is undefined: + return func() + return func(anchor) + + return wrapper + + +@package_to_anchor +def files(anchor: Optional[Anchor] = None) -> Traversable: + """ + Get a Traversable resource for an anchor. + """ + return from_package(resolve(anchor)) + + +def get_resource_reader(package: types.ModuleType) -> Optional[ResourceReader]: + """ + Return the package's loader if it's a ResourceReader. + """ + # We can't use + # a issubclass() check here because apparently abc.'s __subclasscheck__() + # hook wants to create a weak reference to the object, but + # zipimport.zipimporter does not support weak references, resulting in a + # TypeError. That seems terrible. + spec = package.__spec__ + reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore + if reader is None: + return None + return reader(spec.name) # type: ignore + + +@functools.singledispatch +def resolve(cand: Optional[Anchor]) -> types.ModuleType: + return cast(types.ModuleType, cand) + + +@resolve.register +def _(cand: str) -> types.ModuleType: + return importlib.import_module(cand) + + +@resolve.register +def _(cand: None) -> types.ModuleType: + return resolve(_infer_caller().f_globals['__name__']) + + +def _infer_caller(): + """ + Walk the stack and find the frame of the first caller not in this module. + """ + + def is_this_file(frame_info): + return frame_info.filename == __file__ + + def is_wrapper(frame_info): + return frame_info.function == 'wrapper' + + not_this_file = itertools.filterfalse(is_this_file, inspect.stack()) + # also exclude 'wrapper' due to singledispatch in the call stack + callers = itertools.filterfalse(is_wrapper, not_this_file) + return next(callers).frame + + +def from_package(package: types.ModuleType): + """ + Return a Traversable object for the given package. + + """ + spec = wrap_spec(package) + reader = spec.loader.get_resource_reader(spec.name) + return reader.files() + + +@contextlib.contextmanager +def _tempfile( + reader, + suffix='', + # gh-93353: Keep a reference to call os.remove() in late Python + # finalization. + *, + _os_remove=os.remove, +): + # Not using tempfile.NamedTemporaryFile as it leads to deeper 'try' + # blocks due to the need to close the temporary file to work on Windows + # properly. + fd, raw_path = tempfile.mkstemp(suffix=suffix) + try: + try: + os.write(fd, reader()) + finally: + os.close(fd) + del reader + yield pathlib.Path(raw_path) + finally: + try: + _os_remove(raw_path) + except FileNotFoundError: + pass + + +def _temp_file(path): + return _tempfile(path.read_bytes, suffix=path.name) + + +def _is_present_dir(path: Traversable) -> bool: + """ + Some Traversables implement ``is_dir()`` to raise an + exception (i.e. ``FileNotFoundError``) when the + directory doesn't exist. This function wraps that call + to always return a boolean and only return True + if there's a dir and it exists. + """ + with contextlib.suppress(FileNotFoundError): + return path.is_dir() + return False + + +@functools.singledispatch +def as_file(path): + """ + Given a Traversable object, return that object as a + path on the local file system in a context manager. + """ + return _temp_dir(path) if _is_present_dir(path) else _temp_file(path) + + +@as_file.register(pathlib.Path) +@contextlib.contextmanager +def _(path): + """ + Degenerate behavior for pathlib.Path objects. + """ + yield path + + +@contextlib.contextmanager +def _temp_path(dir: tempfile.TemporaryDirectory): + """ + Wrap tempfile.TemporyDirectory to return a pathlib object. + """ + with dir as result: + yield pathlib.Path(result) + + +@contextlib.contextmanager +def _temp_dir(path): + """ + Given a traversable dir, recursively replicate the whole tree + to the file system in a context manager. + """ + assert path.is_dir() + with _temp_path(tempfile.TemporaryDirectory()) as temp_dir: + yield _write_contents(temp_dir, path) + + +def _write_contents(target, source): + child = target.joinpath(source.name) + if source.is_dir(): + child.mkdir() + for item in source.iterdir(): + _write_contents(child, item) + else: + child.write_bytes(source.read_bytes()) + return child diff --git a/lib/importlib_resources/_compat.py b/lib/importlib_resources/_compat.py new file mode 100644 index 00000000..a93a8826 --- /dev/null +++ b/lib/importlib_resources/_compat.py @@ -0,0 +1,109 @@ +# flake8: noqa + +import abc +import os +import sys +import pathlib +from contextlib import suppress +from typing import Union + + +if sys.version_info >= (3, 10): + from zipfile import Path as ZipPath # type: ignore +else: + from zipp import Path as ZipPath # type: ignore + + +try: + from typing import runtime_checkable # type: ignore +except ImportError: + + def runtime_checkable(cls): # type: ignore + return cls + + +try: + from typing import Protocol # type: ignore +except ImportError: + Protocol = abc.ABC # type: ignore + + +class TraversableResourcesLoader: + """ + Adapt loaders to provide TraversableResources and other + compatibility. + + Used primarily for Python 3.9 and earlier where the native + loaders do not yet implement TraversableResources. + """ + + def __init__(self, spec): + self.spec = spec + + @property + def path(self): + return self.spec.origin + + def get_resource_reader(self, name): + from . import readers, _adapters + + def _zip_reader(spec): + with suppress(AttributeError): + return readers.ZipReader(spec.loader, spec.name) + + def _namespace_reader(spec): + with suppress(AttributeError, ValueError): + return readers.NamespaceReader(spec.submodule_search_locations) + + def _available_reader(spec): + with suppress(AttributeError): + return spec.loader.get_resource_reader(spec.name) + + def _native_reader(spec): + reader = _available_reader(spec) + return reader if hasattr(reader, 'files') else None + + def _file_reader(spec): + try: + path = pathlib.Path(self.path) + except TypeError: + return None + if path.exists(): + return readers.FileReader(self) + + return ( + # local ZipReader if a zip module + _zip_reader(self.spec) + or + # local NamespaceReader if a namespace module + _namespace_reader(self.spec) + or + # local FileReader + _file_reader(self.spec) + or + # native reader if it supplies 'files' + _native_reader(self.spec) + or + # fallback - adapt the spec ResourceReader to TraversableReader + _adapters.CompatibilityFiles(self.spec) + ) + + +def wrap_spec(package): + """ + Construct a package spec with traversable compatibility + on the spec/loader/reader. + + Supersedes _adapters.wrap_spec to use TraversableResourcesLoader + from above for older Python compatibility (<3.10). + """ + from . import _adapters + + return _adapters.SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader) + + +if sys.version_info >= (3, 9): + StrPath = Union[str, os.PathLike[str]] +else: + # PathLike is only subscriptable at runtime in 3.9+ + StrPath = Union[str, "os.PathLike[str]"] diff --git a/lib/importlib_resources/_itertools.py b/lib/importlib_resources/_itertools.py new file mode 100644 index 00000000..7b775ef5 --- /dev/null +++ b/lib/importlib_resources/_itertools.py @@ -0,0 +1,38 @@ +# from more_itertools 9.0 +def only(iterable, default=None, too_long=None): + """If *iterable* has only one item, return it. + If it has zero items, return *default*. + If it has more than one item, raise the exception given by *too_long*, + which is ``ValueError`` by default. + >>> only([], default='missing') + 'missing' + >>> only([1]) + 1 + >>> only([1, 2]) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + ValueError: Expected exactly one item in iterable, but got 1, 2, + and perhaps more.' + >>> only([1, 2], too_long=TypeError) # doctest: +IGNORE_EXCEPTION_DETAIL + Traceback (most recent call last): + ... + TypeError + Note that :func:`only` attempts to advance *iterable* twice to ensure there + is only one item. See :func:`spy` or :func:`peekable` to check + iterable contents less destructively. + """ + it = iter(iterable) + first_value = next(it, default) + + try: + second_value = next(it) + except StopIteration: + pass + else: + msg = ( + 'Expected exactly one item in iterable, but got {!r}, {!r}, ' + 'and perhaps more.'.format(first_value, second_value) + ) + raise too_long or ValueError(msg) + + return first_value diff --git a/lib/importlib_resources/abc.py b/lib/importlib_resources/abc.py new file mode 100644 index 00000000..23b6aeaf --- /dev/null +++ b/lib/importlib_resources/abc.py @@ -0,0 +1,170 @@ +import abc +import io +import itertools +import pathlib +from typing import Any, BinaryIO, Iterable, Iterator, NoReturn, Text, Optional + +from ._compat import runtime_checkable, Protocol, StrPath + + +__all__ = ["ResourceReader", "Traversable", "TraversableResources"] + + +class ResourceReader(metaclass=abc.ABCMeta): + """Abstract base class for loaders to provide resource reading support.""" + + @abc.abstractmethod + def open_resource(self, resource: Text) -> BinaryIO: + """Return an opened, file-like object for binary reading. + + The 'resource' argument is expected to represent only a file name. + If the resource cannot be found, FileNotFoundError is raised. + """ + # This deliberately raises FileNotFoundError instead of + # NotImplementedError so that if this method is accidentally called, + # it'll still do the right thing. + raise FileNotFoundError + + @abc.abstractmethod + def resource_path(self, resource: Text) -> Text: + """Return the file system path to the specified resource. + + The 'resource' argument is expected to represent only a file name. + If the resource does not exist on the file system, raise + FileNotFoundError. + """ + # This deliberately raises FileNotFoundError instead of + # NotImplementedError so that if this method is accidentally called, + # it'll still do the right thing. + raise FileNotFoundError + + @abc.abstractmethod + def is_resource(self, path: Text) -> bool: + """Return True if the named 'path' is a resource. + + Files are resources, directories are not. + """ + raise FileNotFoundError + + @abc.abstractmethod + def contents(self) -> Iterable[str]: + """Return an iterable of entries in `package`.""" + raise FileNotFoundError + + +class TraversalError(Exception): + pass + + +@runtime_checkable +class Traversable(Protocol): + """ + An object with a subset of pathlib.Path methods suitable for + traversing directories and opening files. + + Any exceptions that occur when accessing the backing resource + may propagate unaltered. + """ + + @abc.abstractmethod + def iterdir(self) -> Iterator["Traversable"]: + """ + Yield Traversable objects in self + """ + + def read_bytes(self) -> bytes: + """ + Read contents of self as bytes + """ + with self.open('rb') as strm: + return strm.read() + + def read_text(self, encoding: Optional[str] = None) -> str: + """ + Read contents of self as text + """ + with self.open(encoding=encoding) as strm: + return strm.read() + + @abc.abstractmethod + def is_dir(self) -> bool: + """ + Return True if self is a directory + """ + + @abc.abstractmethod + def is_file(self) -> bool: + """ + Return True if self is a file + """ + + def joinpath(self, *descendants: StrPath) -> "Traversable": + """ + Return Traversable resolved with any descendants applied. + + Each descendant should be a path segment relative to self + and each may contain multiple levels separated by + ``posixpath.sep`` (``/``). + """ + if not descendants: + return self + names = itertools.chain.from_iterable( + path.parts for path in map(pathlib.PurePosixPath, descendants) + ) + target = next(names) + matches = ( + traversable for traversable in self.iterdir() if traversable.name == target + ) + try: + match = next(matches) + except StopIteration: + raise TraversalError( + "Target not found during traversal.", target, list(names) + ) + return match.joinpath(*names) + + def __truediv__(self, child: StrPath) -> "Traversable": + """ + Return Traversable child in self + """ + return self.joinpath(child) + + @abc.abstractmethod + def open(self, mode='r', *args, **kwargs): + """ + mode may be 'r' or 'rb' to open as text or binary. Return a handle + suitable for reading (same as pathlib.Path.open). + + When opening as text, accepts encoding parameters such as those + accepted by io.TextIOWrapper. + """ + + @property + @abc.abstractmethod + def name(self) -> str: + """ + The base name of this object without any parent references. + """ + + +class TraversableResources(ResourceReader): + """ + The required interface for providing traversable + resources. + """ + + @abc.abstractmethod + def files(self) -> "Traversable": + """Return a Traversable object for the loaded package.""" + + def open_resource(self, resource: StrPath) -> io.BufferedReader: + return self.files().joinpath(resource).open('rb') + + def resource_path(self, resource: Any) -> NoReturn: + raise FileNotFoundError(resource) + + def is_resource(self, path: StrPath) -> bool: + return self.files().joinpath(path).is_file() + + def contents(self) -> Iterator[str]: + return (item.name for item in self.files().iterdir()) diff --git a/lib/importlib_resources/py.typed b/lib/importlib_resources/py.typed new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/readers.py b/lib/importlib_resources/readers.py new file mode 100644 index 00000000..51d030a6 --- /dev/null +++ b/lib/importlib_resources/readers.py @@ -0,0 +1,144 @@ +import collections +import itertools +import pathlib +import operator + +from . import abc + +from ._itertools import only +from ._compat import ZipPath + + +def remove_duplicates(items): + return iter(collections.OrderedDict.fromkeys(items)) + + +class FileReader(abc.TraversableResources): + def __init__(self, loader): + self.path = pathlib.Path(loader.path).parent + + def resource_path(self, resource): + """ + Return the file system path to prevent + `resources.path()` from creating a temporary + copy. + """ + return str(self.path.joinpath(resource)) + + def files(self): + return self.path + + +class ZipReader(abc.TraversableResources): + def __init__(self, loader, module): + _, _, name = module.rpartition('.') + self.prefix = loader.prefix.replace('\\', '/') + name + '/' + self.archive = loader.archive + + def open_resource(self, resource): + try: + return super().open_resource(resource) + except KeyError as exc: + raise FileNotFoundError(exc.args[0]) + + def is_resource(self, path): + """ + Workaround for `zipfile.Path.is_file` returning true + for non-existent paths. + """ + target = self.files().joinpath(path) + return target.is_file() and target.exists() + + def files(self): + return ZipPath(self.archive, self.prefix) + + +class MultiplexedPath(abc.Traversable): + """ + Given a series of Traversable objects, implement a merged + version of the interface across all objects. Useful for + namespace packages which may be multihomed at a single + name. + """ + + def __init__(self, *paths): + self._paths = list(map(pathlib.Path, remove_duplicates(paths))) + if not self._paths: + message = 'MultiplexedPath must contain at least one path' + raise FileNotFoundError(message) + if not all(path.is_dir() for path in self._paths): + raise NotADirectoryError('MultiplexedPath only supports directories') + + def iterdir(self): + children = (child for path in self._paths for child in path.iterdir()) + by_name = operator.attrgetter('name') + groups = itertools.groupby(sorted(children, key=by_name), key=by_name) + return map(self._follow, (locs for name, locs in groups)) + + def read_bytes(self): + raise FileNotFoundError(f'{self} is not a file') + + def read_text(self, *args, **kwargs): + raise FileNotFoundError(f'{self} is not a file') + + def is_dir(self): + return True + + def is_file(self): + return False + + def joinpath(self, *descendants): + try: + return super().joinpath(*descendants) + except abc.TraversalError: + # One of the paths did not resolve (a directory does not exist). + # Just return something that will not exist. + return self._paths[0].joinpath(*descendants) + + @classmethod + def _follow(cls, children): + """ + Construct a MultiplexedPath if needed. + + If children contains a sole element, return it. + Otherwise, return a MultiplexedPath of the items. + Unless one of the items is not a Directory, then return the first. + """ + subdirs, one_dir, one_file = itertools.tee(children, 3) + + try: + return only(one_dir) + except ValueError: + try: + return cls(*subdirs) + except NotADirectoryError: + return next(one_file) + + def open(self, *args, **kwargs): + raise FileNotFoundError(f'{self} is not a file') + + @property + def name(self): + return self._paths[0].name + + def __repr__(self): + paths = ', '.join(f"'{path}'" for path in self._paths) + return f'MultiplexedPath({paths})' + + +class NamespaceReader(abc.TraversableResources): + def __init__(self, namespace_path): + if 'NamespacePath' not in str(namespace_path): + raise ValueError('Invalid path') + self.path = MultiplexedPath(*list(namespace_path)) + + def resource_path(self, resource): + """ + Return the file system path to prevent + `resources.path()` from creating a temporary + copy. + """ + return str(self.path.joinpath(resource)) + + def files(self): + return self.path diff --git a/lib/importlib_resources/simple.py b/lib/importlib_resources/simple.py new file mode 100644 index 00000000..7770c922 --- /dev/null +++ b/lib/importlib_resources/simple.py @@ -0,0 +1,106 @@ +""" +Interface adapters for low-level readers. +""" + +import abc +import io +import itertools +from typing import BinaryIO, List + +from .abc import Traversable, TraversableResources + + +class SimpleReader(abc.ABC): + """ + The minimum, low-level interface required from a resource + provider. + """ + + @property + @abc.abstractmethod + def package(self) -> str: + """ + The name of the package for which this reader loads resources. + """ + + @abc.abstractmethod + def children(self) -> List['SimpleReader']: + """ + Obtain an iterable of SimpleReader for available + child containers (e.g. directories). + """ + + @abc.abstractmethod + def resources(self) -> List[str]: + """ + Obtain available named resources for this virtual package. + """ + + @abc.abstractmethod + def open_binary(self, resource: str) -> BinaryIO: + """ + Obtain a File-like for a named resource. + """ + + @property + def name(self): + return self.package.split('.')[-1] + + +class ResourceContainer(Traversable): + """ + Traversable container for a package's resources via its reader. + """ + + def __init__(self, reader: SimpleReader): + self.reader = reader + + def is_dir(self): + return True + + def is_file(self): + return False + + def iterdir(self): + files = (ResourceHandle(self, name) for name in self.reader.resources) + dirs = map(ResourceContainer, self.reader.children()) + return itertools.chain(files, dirs) + + def open(self, *args, **kwargs): + raise IsADirectoryError() + + +class ResourceHandle(Traversable): + """ + Handle to a named resource in a ResourceReader. + """ + + def __init__(self, parent: ResourceContainer, name: str): + self.parent = parent + self.name = name # type: ignore + + def is_file(self): + return True + + def is_dir(self): + return False + + def open(self, mode='r', *args, **kwargs): + stream = self.parent.reader.open_binary(self.name) + if 'b' not in mode: + stream = io.TextIOWrapper(*args, **kwargs) + return stream + + def joinpath(self, name): + raise RuntimeError("Cannot traverse into a resource") + + +class TraversableReader(TraversableResources, SimpleReader): + """ + A TraversableResources based on SimpleReader. Resource providers + may derive from this class to provide the TraversableResources + interface by supplying the SimpleReader interface. + """ + + def files(self): + return ResourceContainer(self) diff --git a/lib/importlib_resources/tests/__init__.py b/lib/importlib_resources/tests/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/_compat.py b/lib/importlib_resources/tests/_compat.py new file mode 100644 index 00000000..e7bf06dd --- /dev/null +++ b/lib/importlib_resources/tests/_compat.py @@ -0,0 +1,32 @@ +import os + + +try: + from test.support import import_helper # type: ignore +except ImportError: + # Python 3.9 and earlier + class import_helper: # type: ignore + from test.support import ( + modules_setup, + modules_cleanup, + DirsOnSysPath, + CleanImport, + ) + + +try: + from test.support import os_helper # type: ignore +except ImportError: + # Python 3.9 compat + class os_helper: # type:ignore + from test.support import temp_dir + + +try: + # Python 3.10 + from test.support.os_helper import unlink +except ImportError: + from test.support import unlink as _unlink + + def unlink(target): + return _unlink(os.fspath(target)) diff --git a/lib/importlib_resources/tests/_path.py b/lib/importlib_resources/tests/_path.py new file mode 100644 index 00000000..1f97c961 --- /dev/null +++ b/lib/importlib_resources/tests/_path.py @@ -0,0 +1,56 @@ +import pathlib +import functools + +from typing import Dict, Union + + +#### +# from jaraco.path 3.4.1 + +FilesSpec = Dict[str, Union[str, bytes, 'FilesSpec']] # type: ignore + + +def build(spec: FilesSpec, prefix=pathlib.Path()): + """ + Build a set of files/directories, as described by the spec. + + Each key represents a pathname, and the value represents + the content. Content may be a nested directory. + + >>> spec = { + ... 'README.txt': "A README file", + ... "foo": { + ... "__init__.py": "", + ... "bar": { + ... "__init__.py": "", + ... }, + ... "baz.py": "# Some code", + ... } + ... } + >>> target = getfixture('tmp_path') + >>> build(spec, target) + >>> target.joinpath('foo/baz.py').read_text(encoding='utf-8') + '# Some code' + """ + for name, contents in spec.items(): + create(contents, pathlib.Path(prefix) / name) + + +@functools.singledispatch +def create(content: Union[str, bytes, FilesSpec], path): + path.mkdir(exist_ok=True) + build(content, prefix=path) # type: ignore + + +@create.register +def _(content: bytes, path): + path.write_bytes(content) + + +@create.register +def _(content: str, path): + path.write_text(content, encoding='utf-8') + + +# end from jaraco.path +#### diff --git a/lib/importlib_resources/tests/data01/__init__.py b/lib/importlib_resources/tests/data01/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/data01/binary.file b/lib/importlib_resources/tests/data01/binary.file new file mode 100644 index 00000000..eaf36c1d Binary files /dev/null and b/lib/importlib_resources/tests/data01/binary.file differ diff --git a/lib/importlib_resources/tests/data01/subdirectory/__init__.py b/lib/importlib_resources/tests/data01/subdirectory/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/data01/subdirectory/binary.file b/lib/importlib_resources/tests/data01/subdirectory/binary.file new file mode 100644 index 00000000..eaf36c1d Binary files /dev/null and b/lib/importlib_resources/tests/data01/subdirectory/binary.file differ diff --git a/lib/importlib_resources/tests/data01/utf-16.file b/lib/importlib_resources/tests/data01/utf-16.file new file mode 100644 index 00000000..2cb77229 Binary files /dev/null and b/lib/importlib_resources/tests/data01/utf-16.file differ diff --git a/lib/importlib_resources/tests/data01/utf-8.file b/lib/importlib_resources/tests/data01/utf-8.file new file mode 100644 index 00000000..1c0132ad --- /dev/null +++ b/lib/importlib_resources/tests/data01/utf-8.file @@ -0,0 +1 @@ +Hello, UTF-8 world! diff --git a/lib/importlib_resources/tests/data02/__init__.py b/lib/importlib_resources/tests/data02/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/data02/one/__init__.py b/lib/importlib_resources/tests/data02/one/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/data02/one/resource1.txt b/lib/importlib_resources/tests/data02/one/resource1.txt new file mode 100644 index 00000000..61a813e4 --- /dev/null +++ b/lib/importlib_resources/tests/data02/one/resource1.txt @@ -0,0 +1 @@ +one resource diff --git a/lib/importlib_resources/tests/data02/subdirectory/subsubdir/resource.txt b/lib/importlib_resources/tests/data02/subdirectory/subsubdir/resource.txt new file mode 100644 index 00000000..48f587a2 --- /dev/null +++ b/lib/importlib_resources/tests/data02/subdirectory/subsubdir/resource.txt @@ -0,0 +1 @@ +a resource \ No newline at end of file diff --git a/lib/importlib_resources/tests/data02/two/__init__.py b/lib/importlib_resources/tests/data02/two/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/data02/two/resource2.txt b/lib/importlib_resources/tests/data02/two/resource2.txt new file mode 100644 index 00000000..a80ce46e --- /dev/null +++ b/lib/importlib_resources/tests/data02/two/resource2.txt @@ -0,0 +1 @@ +two resource diff --git a/lib/importlib_resources/tests/namespacedata01/binary.file b/lib/importlib_resources/tests/namespacedata01/binary.file new file mode 100644 index 00000000..eaf36c1d Binary files /dev/null and b/lib/importlib_resources/tests/namespacedata01/binary.file differ diff --git a/lib/importlib_resources/tests/namespacedata01/utf-16.file b/lib/importlib_resources/tests/namespacedata01/utf-16.file new file mode 100644 index 00000000..2cb77229 Binary files /dev/null and b/lib/importlib_resources/tests/namespacedata01/utf-16.file differ diff --git a/lib/importlib_resources/tests/namespacedata01/utf-8.file b/lib/importlib_resources/tests/namespacedata01/utf-8.file new file mode 100644 index 00000000..1c0132ad --- /dev/null +++ b/lib/importlib_resources/tests/namespacedata01/utf-8.file @@ -0,0 +1 @@ +Hello, UTF-8 world! diff --git a/lib/importlib_resources/tests/test_compatibilty_files.py b/lib/importlib_resources/tests/test_compatibilty_files.py new file mode 100644 index 00000000..13ad0dfb --- /dev/null +++ b/lib/importlib_resources/tests/test_compatibilty_files.py @@ -0,0 +1,104 @@ +import io +import unittest + +import importlib_resources as resources + +from importlib_resources._adapters import ( + CompatibilityFiles, + wrap_spec, +) + +from . import util + + +class CompatibilityFilesTests(unittest.TestCase): + @property + def package(self): + bytes_data = io.BytesIO(b'Hello, world!') + return util.create_package( + file=bytes_data, + path='some_path', + contents=('a', 'b', 'c'), + ) + + @property + def files(self): + return resources.files(self.package) + + def test_spec_path_iter(self): + self.assertEqual( + sorted(path.name for path in self.files.iterdir()), + ['a', 'b', 'c'], + ) + + def test_child_path_iter(self): + self.assertEqual(list((self.files / 'a').iterdir()), []) + + def test_orphan_path_iter(self): + self.assertEqual(list((self.files / 'a' / 'a').iterdir()), []) + self.assertEqual(list((self.files / 'a' / 'a' / 'a').iterdir()), []) + + def test_spec_path_is(self): + self.assertFalse(self.files.is_file()) + self.assertFalse(self.files.is_dir()) + + def test_child_path_is(self): + self.assertTrue((self.files / 'a').is_file()) + self.assertFalse((self.files / 'a').is_dir()) + + def test_orphan_path_is(self): + self.assertFalse((self.files / 'a' / 'a').is_file()) + self.assertFalse((self.files / 'a' / 'a').is_dir()) + self.assertFalse((self.files / 'a' / 'a' / 'a').is_file()) + self.assertFalse((self.files / 'a' / 'a' / 'a').is_dir()) + + def test_spec_path_name(self): + self.assertEqual(self.files.name, 'testingpackage') + + def test_child_path_name(self): + self.assertEqual((self.files / 'a').name, 'a') + + def test_orphan_path_name(self): + self.assertEqual((self.files / 'a' / 'b').name, 'b') + self.assertEqual((self.files / 'a' / 'b' / 'c').name, 'c') + + def test_spec_path_open(self): + self.assertEqual(self.files.read_bytes(), b'Hello, world!') + self.assertEqual(self.files.read_text(encoding='utf-8'), 'Hello, world!') + + def test_child_path_open(self): + self.assertEqual((self.files / 'a').read_bytes(), b'Hello, world!') + self.assertEqual( + (self.files / 'a').read_text(encoding='utf-8'), 'Hello, world!' + ) + + def test_orphan_path_open(self): + with self.assertRaises(FileNotFoundError): + (self.files / 'a' / 'b').read_bytes() + with self.assertRaises(FileNotFoundError): + (self.files / 'a' / 'b' / 'c').read_bytes() + + def test_open_invalid_mode(self): + with self.assertRaises(ValueError): + self.files.open('0') + + def test_orphan_path_invalid(self): + with self.assertRaises(ValueError): + CompatibilityFiles.OrphanPath() + + def test_wrap_spec(self): + spec = wrap_spec(self.package) + self.assertIsInstance(spec.loader.get_resource_reader(None), CompatibilityFiles) + + +class CompatibilityFilesNoReaderTests(unittest.TestCase): + @property + def package(self): + return util.create_package_from_loader(None) + + @property + def files(self): + return resources.files(self.package) + + def test_spec_path_joinpath(self): + self.assertIsInstance(self.files / 'a', CompatibilityFiles.OrphanPath) diff --git a/lib/importlib_resources/tests/test_contents.py b/lib/importlib_resources/tests/test_contents.py new file mode 100644 index 00000000..525568e8 --- /dev/null +++ b/lib/importlib_resources/tests/test_contents.py @@ -0,0 +1,43 @@ +import unittest +import importlib_resources as resources + +from . import data01 +from . import util + + +class ContentsTests: + expected = { + '__init__.py', + 'binary.file', + 'subdirectory', + 'utf-16.file', + 'utf-8.file', + } + + def test_contents(self): + contents = {path.name for path in resources.files(self.data).iterdir()} + assert self.expected <= contents + + +class ContentsDiskTests(ContentsTests, unittest.TestCase): + def setUp(self): + self.data = data01 + + +class ContentsZipTests(ContentsTests, util.ZipSetup, unittest.TestCase): + pass + + +class ContentsNamespaceTests(ContentsTests, unittest.TestCase): + expected = { + # no __init__ because of namespace design + # no subdirectory as incidental difference in fixture + 'binary.file', + 'utf-16.file', + 'utf-8.file', + } + + def setUp(self): + from . import namespacedata01 + + self.data = namespacedata01 diff --git a/lib/importlib_resources/tests/test_custom.py b/lib/importlib_resources/tests/test_custom.py new file mode 100644 index 00000000..e85ddd65 --- /dev/null +++ b/lib/importlib_resources/tests/test_custom.py @@ -0,0 +1,45 @@ +import unittest +import contextlib +import pathlib + +import importlib_resources as resources +from ..abc import TraversableResources, ResourceReader +from . import util +from ._compat import os_helper + + +class SimpleLoader: + """ + A simple loader that only implements a resource reader. + """ + + def __init__(self, reader: ResourceReader): + self.reader = reader + + def get_resource_reader(self, package): + return self.reader + + +class MagicResources(TraversableResources): + """ + Magically returns the resources at path. + """ + + def __init__(self, path: pathlib.Path): + self.path = path + + def files(self): + return self.path + + +class CustomTraversableResourcesTests(unittest.TestCase): + def setUp(self): + self.fixtures = contextlib.ExitStack() + self.addCleanup(self.fixtures.close) + + def test_custom_loader(self): + temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) + loader = SimpleLoader(MagicResources(temp_dir)) + pkg = util.create_package_from_loader(loader) + files = resources.files(pkg) + assert files is temp_dir diff --git a/lib/importlib_resources/tests/test_files.py b/lib/importlib_resources/tests/test_files.py new file mode 100644 index 00000000..197a063b --- /dev/null +++ b/lib/importlib_resources/tests/test_files.py @@ -0,0 +1,112 @@ +import typing +import textwrap +import unittest +import warnings +import importlib +import contextlib + +import importlib_resources as resources +from ..abc import Traversable +from . import data01 +from . import util +from . import _path +from ._compat import os_helper, import_helper + + +@contextlib.contextmanager +def suppress_known_deprecation(): + with warnings.catch_warnings(record=True) as ctx: + warnings.simplefilter('default', category=DeprecationWarning) + yield ctx + + +class FilesTests: + def test_read_bytes(self): + files = resources.files(self.data) + actual = files.joinpath('utf-8.file').read_bytes() + assert actual == b'Hello, UTF-8 world!\n' + + def test_read_text(self): + files = resources.files(self.data) + actual = files.joinpath('utf-8.file').read_text(encoding='utf-8') + assert actual == 'Hello, UTF-8 world!\n' + + @unittest.skipUnless( + hasattr(typing, 'runtime_checkable'), + "Only suitable when typing supports runtime_checkable", + ) + def test_traversable(self): + assert isinstance(resources.files(self.data), Traversable) + + def test_old_parameter(self): + """ + Files used to take a 'package' parameter. Make sure anyone + passing by name is still supported. + """ + with suppress_known_deprecation(): + resources.files(package=self.data) + + +class OpenDiskTests(FilesTests, unittest.TestCase): + def setUp(self): + self.data = data01 + + +class OpenZipTests(FilesTests, util.ZipSetup, unittest.TestCase): + pass + + +class OpenNamespaceTests(FilesTests, unittest.TestCase): + def setUp(self): + from . import namespacedata01 + + self.data = namespacedata01 + + +class SiteDir: + def setUp(self): + self.fixtures = contextlib.ExitStack() + self.addCleanup(self.fixtures.close) + self.site_dir = self.fixtures.enter_context(os_helper.temp_dir()) + self.fixtures.enter_context(import_helper.DirsOnSysPath(self.site_dir)) + self.fixtures.enter_context(import_helper.CleanImport()) + + +class ModulesFilesTests(SiteDir, unittest.TestCase): + def test_module_resources(self): + """ + A module can have resources found adjacent to the module. + """ + spec = { + 'mod.py': '', + 'res.txt': 'resources are the best', + } + _path.build(spec, self.site_dir) + import mod + + actual = resources.files(mod).joinpath('res.txt').read_text(encoding='utf-8') + assert actual == spec['res.txt'] + + +class ImplicitContextFilesTests(SiteDir, unittest.TestCase): + def test_implicit_files(self): + """ + Without any parameter, files() will infer the location as the caller. + """ + spec = { + 'somepkg': { + '__init__.py': textwrap.dedent( + """ + import importlib_resources as res + val = res.files().joinpath('res.txt').read_text(encoding='utf-8') + """ + ), + 'res.txt': 'resources are the best', + }, + } + _path.build(spec, self.site_dir) + assert importlib.import_module('somepkg').val == 'resources are the best' + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/test_open.py b/lib/importlib_resources/tests/test_open.py new file mode 100644 index 00000000..83b737dc --- /dev/null +++ b/lib/importlib_resources/tests/test_open.py @@ -0,0 +1,85 @@ +import unittest + +import importlib_resources as resources +from . import data01 +from . import util + + +class CommonBinaryTests(util.CommonTests, unittest.TestCase): + def execute(self, package, path): + target = resources.files(package).joinpath(path) + with target.open('rb'): + pass + + +class CommonTextTests(util.CommonTests, unittest.TestCase): + def execute(self, package, path): + target = resources.files(package).joinpath(path) + with target.open(encoding='utf-8'): + pass + + +class OpenTests: + def test_open_binary(self): + target = resources.files(self.data) / 'binary.file' + with target.open('rb') as fp: + result = fp.read() + self.assertEqual(result, b'\x00\x01\x02\x03') + + def test_open_text_default_encoding(self): + target = resources.files(self.data) / 'utf-8.file' + with target.open(encoding='utf-8') as fp: + result = fp.read() + self.assertEqual(result, 'Hello, UTF-8 world!\n') + + def test_open_text_given_encoding(self): + target = resources.files(self.data) / 'utf-16.file' + with target.open(encoding='utf-16', errors='strict') as fp: + result = fp.read() + self.assertEqual(result, 'Hello, UTF-16 world!\n') + + def test_open_text_with_errors(self): + """ + Raises UnicodeError without the 'errors' argument. + """ + target = resources.files(self.data) / 'utf-16.file' + with target.open(encoding='utf-8', errors='strict') as fp: + self.assertRaises(UnicodeError, fp.read) + with target.open(encoding='utf-8', errors='ignore') as fp: + result = fp.read() + self.assertEqual( + result, + 'H\x00e\x00l\x00l\x00o\x00,\x00 ' + '\x00U\x00T\x00F\x00-\x001\x006\x00 ' + '\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00', + ) + + def test_open_binary_FileNotFoundError(self): + target = resources.files(self.data) / 'does-not-exist' + with self.assertRaises(FileNotFoundError): + target.open('rb') + + def test_open_text_FileNotFoundError(self): + target = resources.files(self.data) / 'does-not-exist' + with self.assertRaises(FileNotFoundError): + target.open(encoding='utf-8') + + +class OpenDiskTests(OpenTests, unittest.TestCase): + def setUp(self): + self.data = data01 + + +class OpenDiskNamespaceTests(OpenTests, unittest.TestCase): + def setUp(self): + from . import namespacedata01 + + self.data = namespacedata01 + + +class OpenZipTests(OpenTests, util.ZipSetup, unittest.TestCase): + pass + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/test_path.py b/lib/importlib_resources/tests/test_path.py new file mode 100644 index 00000000..7cb20003 --- /dev/null +++ b/lib/importlib_resources/tests/test_path.py @@ -0,0 +1,69 @@ +import io +import unittest + +import importlib_resources as resources +from . import data01 +from . import util + + +class CommonTests(util.CommonTests, unittest.TestCase): + def execute(self, package, path): + with resources.as_file(resources.files(package).joinpath(path)): + pass + + +class PathTests: + def test_reading(self): + """ + Path should be readable. + + Test also implicitly verifies the returned object is a pathlib.Path + instance. + """ + target = resources.files(self.data) / 'utf-8.file' + with resources.as_file(target) as path: + self.assertTrue(path.name.endswith("utf-8.file"), repr(path)) + # pathlib.Path.read_text() was introduced in Python 3.5. + with path.open('r', encoding='utf-8') as file: + text = file.read() + self.assertEqual('Hello, UTF-8 world!\n', text) + + +class PathDiskTests(PathTests, unittest.TestCase): + data = data01 + + def test_natural_path(self): + """ + Guarantee the internal implementation detail that + file-system-backed resources do not get the tempdir + treatment. + """ + target = resources.files(self.data) / 'utf-8.file' + with resources.as_file(target) as path: + assert 'data' in str(path) + + +class PathMemoryTests(PathTests, unittest.TestCase): + def setUp(self): + file = io.BytesIO(b'Hello, UTF-8 world!\n') + self.addCleanup(file.close) + self.data = util.create_package( + file=file, path=FileNotFoundError("package exists only in memory") + ) + self.data.__spec__.origin = None + self.data.__spec__.has_location = False + + +class PathZipTests(PathTests, util.ZipSetup, unittest.TestCase): + def test_remove_in_context_manager(self): + """ + It is not an error if the file that was temporarily stashed on the + file system is removed inside the `with` stanza. + """ + target = resources.files(self.data) / 'utf-8.file' + with resources.as_file(target) as path: + path.unlink() + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/test_read.py b/lib/importlib_resources/tests/test_read.py new file mode 100644 index 00000000..b5aaec45 --- /dev/null +++ b/lib/importlib_resources/tests/test_read.py @@ -0,0 +1,82 @@ +import unittest +import importlib_resources as resources + +from . import data01 +from . import util +from importlib import import_module + + +class CommonBinaryTests(util.CommonTests, unittest.TestCase): + def execute(self, package, path): + resources.files(package).joinpath(path).read_bytes() + + +class CommonTextTests(util.CommonTests, unittest.TestCase): + def execute(self, package, path): + resources.files(package).joinpath(path).read_text(encoding='utf-8') + + +class ReadTests: + def test_read_bytes(self): + result = resources.files(self.data).joinpath('binary.file').read_bytes() + self.assertEqual(result, b'\0\1\2\3') + + def test_read_text_default_encoding(self): + result = ( + resources.files(self.data) + .joinpath('utf-8.file') + .read_text(encoding='utf-8') + ) + self.assertEqual(result, 'Hello, UTF-8 world!\n') + + def test_read_text_given_encoding(self): + result = ( + resources.files(self.data) + .joinpath('utf-16.file') + .read_text(encoding='utf-16') + ) + self.assertEqual(result, 'Hello, UTF-16 world!\n') + + def test_read_text_with_errors(self): + """ + Raises UnicodeError without the 'errors' argument. + """ + target = resources.files(self.data) / 'utf-16.file' + self.assertRaises(UnicodeError, target.read_text, encoding='utf-8') + result = target.read_text(encoding='utf-8', errors='ignore') + self.assertEqual( + result, + 'H\x00e\x00l\x00l\x00o\x00,\x00 ' + '\x00U\x00T\x00F\x00-\x001\x006\x00 ' + '\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00', + ) + + +class ReadDiskTests(ReadTests, unittest.TestCase): + data = data01 + + +class ReadZipTests(ReadTests, util.ZipSetup, unittest.TestCase): + def test_read_submodule_resource(self): + submodule = import_module('ziptestdata.subdirectory') + result = resources.files(submodule).joinpath('binary.file').read_bytes() + self.assertEqual(result, b'\0\1\2\3') + + def test_read_submodule_resource_by_name(self): + result = ( + resources.files('ziptestdata.subdirectory') + .joinpath('binary.file') + .read_bytes() + ) + self.assertEqual(result, b'\0\1\2\3') + + +class ReadNamespaceTests(ReadTests, unittest.TestCase): + def setUp(self): + from . import namespacedata01 + + self.data = namespacedata01 + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/test_reader.py b/lib/importlib_resources/tests/test_reader.py new file mode 100644 index 00000000..e2bdf19c --- /dev/null +++ b/lib/importlib_resources/tests/test_reader.py @@ -0,0 +1,144 @@ +import os.path +import sys +import pathlib +import unittest + +from importlib import import_module +from importlib_resources.readers import MultiplexedPath, NamespaceReader + + +class MultiplexedPathTest(unittest.TestCase): + @classmethod + def setUpClass(cls): + path = pathlib.Path(__file__).parent / 'namespacedata01' + cls.folder = str(path) + + def test_init_no_paths(self): + with self.assertRaises(FileNotFoundError): + MultiplexedPath() + + def test_init_file(self): + with self.assertRaises(NotADirectoryError): + MultiplexedPath(os.path.join(self.folder, 'binary.file')) + + def test_iterdir(self): + contents = {path.name for path in MultiplexedPath(self.folder).iterdir()} + try: + contents.remove('__pycache__') + except (KeyError, ValueError): + pass + self.assertEqual(contents, {'binary.file', 'utf-16.file', 'utf-8.file'}) + + def test_iterdir_duplicate(self): + data01 = os.path.abspath(os.path.join(__file__, '..', 'data01')) + contents = { + path.name for path in MultiplexedPath(self.folder, data01).iterdir() + } + for remove in ('__pycache__', '__init__.pyc'): + try: + contents.remove(remove) + except (KeyError, ValueError): + pass + self.assertEqual( + contents, + {'__init__.py', 'binary.file', 'subdirectory', 'utf-16.file', 'utf-8.file'}, + ) + + def test_is_dir(self): + self.assertEqual(MultiplexedPath(self.folder).is_dir(), True) + + def test_is_file(self): + self.assertEqual(MultiplexedPath(self.folder).is_file(), False) + + def test_open_file(self): + path = MultiplexedPath(self.folder) + with self.assertRaises(FileNotFoundError): + path.read_bytes() + with self.assertRaises(FileNotFoundError): + path.read_text() + with self.assertRaises(FileNotFoundError): + path.open() + + def test_join_path(self): + prefix = os.path.abspath(os.path.join(__file__, '..')) + data01 = os.path.join(prefix, 'data01') + path = MultiplexedPath(self.folder, data01) + self.assertEqual( + str(path.joinpath('binary.file'))[len(prefix) + 1 :], + os.path.join('namespacedata01', 'binary.file'), + ) + self.assertEqual( + str(path.joinpath('subdirectory'))[len(prefix) + 1 :], + os.path.join('data01', 'subdirectory'), + ) + self.assertEqual( + str(path.joinpath('imaginary'))[len(prefix) + 1 :], + os.path.join('namespacedata01', 'imaginary'), + ) + self.assertEqual(path.joinpath(), path) + + def test_join_path_compound(self): + path = MultiplexedPath(self.folder) + assert not path.joinpath('imaginary/foo.py').exists() + + def test_join_path_common_subdir(self): + prefix = os.path.abspath(os.path.join(__file__, '..')) + data01 = os.path.join(prefix, 'data01') + data02 = os.path.join(prefix, 'data02') + path = MultiplexedPath(data01, data02) + self.assertIsInstance(path.joinpath('subdirectory'), MultiplexedPath) + self.assertEqual( + str(path.joinpath('subdirectory', 'subsubdir'))[len(prefix) + 1 :], + os.path.join('data02', 'subdirectory', 'subsubdir'), + ) + + def test_repr(self): + self.assertEqual( + repr(MultiplexedPath(self.folder)), + f"MultiplexedPath('{self.folder}')", + ) + + def test_name(self): + self.assertEqual( + MultiplexedPath(self.folder).name, + os.path.basename(self.folder), + ) + + +class NamespaceReaderTest(unittest.TestCase): + site_dir = str(pathlib.Path(__file__).parent) + + @classmethod + def setUpClass(cls): + sys.path.append(cls.site_dir) + + @classmethod + def tearDownClass(cls): + sys.path.remove(cls.site_dir) + + def test_init_error(self): + with self.assertRaises(ValueError): + NamespaceReader(['path1', 'path2']) + + def test_resource_path(self): + namespacedata01 = import_module('namespacedata01') + reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations) + + root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01')) + self.assertEqual( + reader.resource_path('binary.file'), os.path.join(root, 'binary.file') + ) + self.assertEqual( + reader.resource_path('imaginary'), os.path.join(root, 'imaginary') + ) + + def test_files(self): + namespacedata01 = import_module('namespacedata01') + reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations) + root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01')) + self.assertIsInstance(reader.files(), MultiplexedPath) + self.assertEqual(repr(reader.files()), f"MultiplexedPath('{root}')") + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/test_resource.py b/lib/importlib_resources/tests/test_resource.py new file mode 100644 index 00000000..677110cd --- /dev/null +++ b/lib/importlib_resources/tests/test_resource.py @@ -0,0 +1,252 @@ +import contextlib +import sys +import unittest +import importlib_resources as resources +import uuid +import pathlib + +from . import data01 +from . import zipdata01, zipdata02 +from . import util +from importlib import import_module +from ._compat import import_helper, os_helper, unlink + + +class ResourceTests: + # Subclasses are expected to set the `data` attribute. + + def test_is_file_exists(self): + target = resources.files(self.data) / 'binary.file' + self.assertTrue(target.is_file()) + + def test_is_file_missing(self): + target = resources.files(self.data) / 'not-a-file' + self.assertFalse(target.is_file()) + + def test_is_dir(self): + target = resources.files(self.data) / 'subdirectory' + self.assertFalse(target.is_file()) + self.assertTrue(target.is_dir()) + + +class ResourceDiskTests(ResourceTests, unittest.TestCase): + def setUp(self): + self.data = data01 + + +class ResourceZipTests(ResourceTests, util.ZipSetup, unittest.TestCase): + pass + + +def names(traversable): + return {item.name for item in traversable.iterdir()} + + +class ResourceLoaderTests(unittest.TestCase): + def test_resource_contents(self): + package = util.create_package( + file=data01, path=data01.__file__, contents=['A', 'B', 'C'] + ) + self.assertEqual(names(resources.files(package)), {'A', 'B', 'C'}) + + def test_is_file(self): + package = util.create_package( + file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F'] + ) + self.assertTrue(resources.files(package).joinpath('B').is_file()) + + def test_is_dir(self): + package = util.create_package( + file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F'] + ) + self.assertTrue(resources.files(package).joinpath('D').is_dir()) + + def test_resource_missing(self): + package = util.create_package( + file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F'] + ) + self.assertFalse(resources.files(package).joinpath('Z').is_file()) + + +class ResourceCornerCaseTests(unittest.TestCase): + def test_package_has_no_reader_fallback(self): + """ + Test odd ball packages which: + # 1. Do not have a ResourceReader as a loader + # 2. Are not on the file system + # 3. Are not in a zip file + """ + module = util.create_package( + file=data01, path=data01.__file__, contents=['A', 'B', 'C'] + ) + # Give the module a dummy loader. + module.__loader__ = object() + # Give the module a dummy origin. + module.__file__ = '/path/which/shall/not/be/named' + module.__spec__.loader = module.__loader__ + module.__spec__.origin = module.__file__ + self.assertFalse(resources.files(module).joinpath('A').is_file()) + + +class ResourceFromZipsTest01(util.ZipSetupBase, unittest.TestCase): + ZIP_MODULE = zipdata01 # type: ignore + + def test_is_submodule_resource(self): + submodule = import_module('ziptestdata.subdirectory') + self.assertTrue(resources.files(submodule).joinpath('binary.file').is_file()) + + def test_read_submodule_resource_by_name(self): + self.assertTrue( + resources.files('ziptestdata.subdirectory') + .joinpath('binary.file') + .is_file() + ) + + def test_submodule_contents(self): + submodule = import_module('ziptestdata.subdirectory') + self.assertEqual( + names(resources.files(submodule)), {'__init__.py', 'binary.file'} + ) + + def test_submodule_contents_by_name(self): + self.assertEqual( + names(resources.files('ziptestdata.subdirectory')), + {'__init__.py', 'binary.file'}, + ) + + def test_as_file_directory(self): + with resources.as_file(resources.files('ziptestdata')) as data: + assert data.name == 'ziptestdata' + assert data.is_dir() + assert data.joinpath('subdirectory').is_dir() + assert len(list(data.iterdir())) + assert not data.parent.exists() + + +class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase): + ZIP_MODULE = zipdata02 # type: ignore + + def test_unrelated_contents(self): + """ + Test thata zip with two unrelated subpackages return + distinct resources. Ref python/importlib_resources#44. + """ + self.assertEqual( + names(resources.files('ziptestdata.one')), + {'__init__.py', 'resource1.txt'}, + ) + self.assertEqual( + names(resources.files('ziptestdata.two')), + {'__init__.py', 'resource2.txt'}, + ) + + +@contextlib.contextmanager +def zip_on_path(dir): + data_path = pathlib.Path(zipdata01.__file__) + source_zip_path = data_path.parent.joinpath('ziptestdata.zip') + zip_path = pathlib.Path(dir) / f'{uuid.uuid4()}.zip' + zip_path.write_bytes(source_zip_path.read_bytes()) + sys.path.append(str(zip_path)) + import_module('ziptestdata') + + try: + yield + finally: + with contextlib.suppress(ValueError): + sys.path.remove(str(zip_path)) + + with contextlib.suppress(KeyError): + del sys.path_importer_cache[str(zip_path)] + del sys.modules['ziptestdata'] + + with contextlib.suppress(OSError): + unlink(zip_path) + + +class DeletingZipsTest(unittest.TestCase): + """Having accessed resources in a zip file should not keep an open + reference to the zip. + """ + + def setUp(self): + self.fixtures = contextlib.ExitStack() + self.addCleanup(self.fixtures.close) + + modules = import_helper.modules_setup() + self.addCleanup(import_helper.modules_cleanup, *modules) + + temp_dir = self.fixtures.enter_context(os_helper.temp_dir()) + self.fixtures.enter_context(zip_on_path(temp_dir)) + + def test_iterdir_does_not_keep_open(self): + [item.name for item in resources.files('ziptestdata').iterdir()] + + def test_is_file_does_not_keep_open(self): + resources.files('ziptestdata').joinpath('binary.file').is_file() + + def test_is_file_failure_does_not_keep_open(self): + resources.files('ziptestdata').joinpath('not-present').is_file() + + @unittest.skip("Desired but not supported.") + def test_as_file_does_not_keep_open(self): # pragma: no cover + resources.as_file(resources.files('ziptestdata') / 'binary.file') + + def test_entered_path_does_not_keep_open(self): + """ + Mimic what certifi does on import to make its bundle + available for the process duration. + """ + resources.as_file(resources.files('ziptestdata') / 'binary.file').__enter__() + + def test_read_binary_does_not_keep_open(self): + resources.files('ziptestdata').joinpath('binary.file').read_bytes() + + def test_read_text_does_not_keep_open(self): + resources.files('ziptestdata').joinpath('utf-8.file').read_text( + encoding='utf-8' + ) + + +class ResourceFromNamespaceTest01(unittest.TestCase): + site_dir = str(pathlib.Path(__file__).parent) + + @classmethod + def setUpClass(cls): + sys.path.append(cls.site_dir) + + @classmethod + def tearDownClass(cls): + sys.path.remove(cls.site_dir) + + def test_is_submodule_resource(self): + self.assertTrue( + resources.files(import_module('namespacedata01')) + .joinpath('binary.file') + .is_file() + ) + + def test_read_submodule_resource_by_name(self): + self.assertTrue( + resources.files('namespacedata01').joinpath('binary.file').is_file() + ) + + def test_submodule_contents(self): + contents = names(resources.files(import_module('namespacedata01'))) + try: + contents.remove('__pycache__') + except KeyError: + pass + self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'}) + + def test_submodule_contents_by_name(self): + contents = names(resources.files('namespacedata01')) + try: + contents.remove('__pycache__') + except KeyError: + pass + self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'}) + + +if __name__ == '__main__': + unittest.main() diff --git a/lib/importlib_resources/tests/update-zips.py b/lib/importlib_resources/tests/update-zips.py new file mode 100644 index 00000000..231334aa --- /dev/null +++ b/lib/importlib_resources/tests/update-zips.py @@ -0,0 +1,53 @@ +""" +Generate the zip test data files. + +Run to build the tests/zipdataNN/ziptestdata.zip files from +files in tests/dataNN. + +Replaces the file with the working copy, but does commit anything +to the source repo. +""" + +import contextlib +import os +import pathlib +import zipfile + + +def main(): + """ + >>> from unittest import mock + >>> monkeypatch = getfixture('monkeypatch') + >>> monkeypatch.setattr(zipfile, 'ZipFile', mock.MagicMock()) + >>> print(); main() # print workaround for bpo-32509 + + ...data01... -> ziptestdata/... + ... + ...data02... -> ziptestdata/... + ... + """ + suffixes = '01', '02' + tuple(map(generate, suffixes)) + + +def generate(suffix): + root = pathlib.Path(__file__).parent.relative_to(os.getcwd()) + zfpath = root / f'zipdata{suffix}/ziptestdata.zip' + with zipfile.ZipFile(zfpath, 'w') as zf: + for src, rel in walk(root / f'data{suffix}'): + dst = 'ziptestdata' / pathlib.PurePosixPath(rel.as_posix()) + print(src, '->', dst) + zf.write(src, dst) + + +def walk(datapath): + for dirpath, dirnames, filenames in os.walk(datapath): + with contextlib.suppress(ValueError): + dirnames.remove('__pycache__') + for filename in filenames: + res = pathlib.Path(dirpath) / filename + rel = res.relative_to(datapath) + yield res, rel + + +__name__ == '__main__' and main() diff --git a/lib/importlib_resources/tests/util.py b/lib/importlib_resources/tests/util.py new file mode 100644 index 00000000..ce0e6fad --- /dev/null +++ b/lib/importlib_resources/tests/util.py @@ -0,0 +1,179 @@ +import abc +import importlib +import io +import sys +import types +import pathlib + +from . import data01 +from . import zipdata01 +from ..abc import ResourceReader +from ._compat import import_helper + + +from importlib.machinery import ModuleSpec + + +class Reader(ResourceReader): + def __init__(self, **kwargs): + vars(self).update(kwargs) + + def get_resource_reader(self, package): + return self + + def open_resource(self, path): + self._path = path + if isinstance(self.file, Exception): + raise self.file + return self.file + + def resource_path(self, path_): + self._path = path_ + if isinstance(self.path, Exception): + raise self.path + return self.path + + def is_resource(self, path_): + self._path = path_ + if isinstance(self.path, Exception): + raise self.path + + def part(entry): + return entry.split('/') + + return any( + len(parts) == 1 and parts[0] == path_ for parts in map(part, self._contents) + ) + + def contents(self): + if isinstance(self.path, Exception): + raise self.path + yield from self._contents + + +def create_package_from_loader(loader, is_package=True): + name = 'testingpackage' + module = types.ModuleType(name) + spec = ModuleSpec(name, loader, origin='does-not-exist', is_package=is_package) + module.__spec__ = spec + module.__loader__ = loader + return module + + +def create_package(file=None, path=None, is_package=True, contents=()): + return create_package_from_loader( + Reader(file=file, path=path, _contents=contents), + is_package, + ) + + +class CommonTests(metaclass=abc.ABCMeta): + """ + Tests shared by test_open, test_path, and test_read. + """ + + @abc.abstractmethod + def execute(self, package, path): + """ + Call the pertinent legacy API function (e.g. open_text, path) + on package and path. + """ + + def test_package_name(self): + """ + Passing in the package name should succeed. + """ + self.execute(data01.__name__, 'utf-8.file') + + def test_package_object(self): + """ + Passing in the package itself should succeed. + """ + self.execute(data01, 'utf-8.file') + + def test_string_path(self): + """ + Passing in a string for the path should succeed. + """ + path = 'utf-8.file' + self.execute(data01, path) + + def test_pathlib_path(self): + """ + Passing in a pathlib.PurePath object for the path should succeed. + """ + path = pathlib.PurePath('utf-8.file') + self.execute(data01, path) + + def test_importing_module_as_side_effect(self): + """ + The anchor package can already be imported. + """ + del sys.modules[data01.__name__] + self.execute(data01.__name__, 'utf-8.file') + + def test_missing_path(self): + """ + Attempting to open or read or request the path for a + non-existent path should succeed if open_resource + can return a viable data stream. + """ + bytes_data = io.BytesIO(b'Hello, world!') + package = create_package(file=bytes_data, path=FileNotFoundError()) + self.execute(package, 'utf-8.file') + self.assertEqual(package.__loader__._path, 'utf-8.file') + + def test_extant_path(self): + # Attempting to open or read or request the path when the + # path does exist should still succeed. Does not assert + # anything about the result. + bytes_data = io.BytesIO(b'Hello, world!') + # any path that exists + path = __file__ + package = create_package(file=bytes_data, path=path) + self.execute(package, 'utf-8.file') + self.assertEqual(package.__loader__._path, 'utf-8.file') + + def test_useless_loader(self): + package = create_package(file=FileNotFoundError(), path=FileNotFoundError()) + with self.assertRaises(FileNotFoundError): + self.execute(package, 'utf-8.file') + + +class ZipSetupBase: + ZIP_MODULE = None + + @classmethod + def setUpClass(cls): + data_path = pathlib.Path(cls.ZIP_MODULE.__file__) + data_dir = data_path.parent + cls._zip_path = str(data_dir / 'ziptestdata.zip') + sys.path.append(cls._zip_path) + cls.data = importlib.import_module('ziptestdata') + + @classmethod + def tearDownClass(cls): + try: + sys.path.remove(cls._zip_path) + except ValueError: + pass + + try: + del sys.path_importer_cache[cls._zip_path] + del sys.modules[cls.data.__name__] + except KeyError: + pass + + try: + del cls.data + del cls._zip_path + except AttributeError: + pass + + def setUp(self): + modules = import_helper.modules_setup() + self.addCleanup(import_helper.modules_cleanup, *modules) + + +class ZipSetup(ZipSetupBase): + ZIP_MODULE = zipdata01 # type: ignore diff --git a/lib/importlib_resources/tests/zipdata01/__init__.py b/lib/importlib_resources/tests/zipdata01/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/zipdata01/ziptestdata.zip b/lib/importlib_resources/tests/zipdata01/ziptestdata.zip new file mode 100644 index 00000000..9a3bb073 Binary files /dev/null and b/lib/importlib_resources/tests/zipdata01/ziptestdata.zip differ diff --git a/lib/importlib_resources/tests/zipdata02/__init__.py b/lib/importlib_resources/tests/zipdata02/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/lib/importlib_resources/tests/zipdata02/ziptestdata.zip b/lib/importlib_resources/tests/zipdata02/ziptestdata.zip new file mode 100644 index 00000000..d63ff512 Binary files /dev/null and b/lib/importlib_resources/tests/zipdata02/ziptestdata.zip differ diff --git a/requirements.txt b/requirements.txt index e71070e6..3fc87314 100644 --- a/requirements.txt +++ b/requirements.txt @@ -19,6 +19,8 @@ gntp==1.0.3 html5lib==1.1 httpagentparser==1.9.5 idna==3.4 +importlib-metadata==6.8.0 +importlib-resources==6.0.1 git+https://github.com/Tautulli/ipwhois.git@master#egg=ipwhois IPy==1.01 Mako==1.2.4