mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-07-07 21:51:14 -07:00
Add importlib-resources-5.2.2
This commit is contained in:
parent
b09faab852
commit
f238343aa2
40 changed files with 1943 additions and 0 deletions
36
lib/importlib_resources/__init__.py
Normal file
36
lib/importlib_resources/__init__.py
Normal file
|
@ -0,0 +1,36 @@
|
||||||
|
"""Read resources contained within a package."""
|
||||||
|
|
||||||
|
from ._common import (
|
||||||
|
as_file,
|
||||||
|
files,
|
||||||
|
Package,
|
||||||
|
Resource,
|
||||||
|
)
|
||||||
|
|
||||||
|
from ._legacy import (
|
||||||
|
contents,
|
||||||
|
open_binary,
|
||||||
|
read_binary,
|
||||||
|
open_text,
|
||||||
|
read_text,
|
||||||
|
is_resource,
|
||||||
|
path,
|
||||||
|
)
|
||||||
|
|
||||||
|
from importlib_resources.abc import ResourceReader
|
||||||
|
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'Package',
|
||||||
|
'Resource',
|
||||||
|
'ResourceReader',
|
||||||
|
'as_file',
|
||||||
|
'contents',
|
||||||
|
'files',
|
||||||
|
'is_resource',
|
||||||
|
'open_binary',
|
||||||
|
'open_text',
|
||||||
|
'path',
|
||||||
|
'read_binary',
|
||||||
|
'read_text',
|
||||||
|
]
|
170
lib/importlib_resources/_adapters.py
Normal file
170
lib/importlib_resources/_adapters.py
Normal file
|
@ -0,0 +1,170 @@
|
||||||
|
from contextlib import suppress
|
||||||
|
from io import TextIOWrapper
|
||||||
|
|
||||||
|
from . import abc
|
||||||
|
|
||||||
|
|
||||||
|
class SpecLoaderAdapter:
|
||||||
|
"""
|
||||||
|
Adapt a package spec to adapt the underlying loader.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, spec, adapter=lambda spec: spec.loader):
|
||||||
|
self.spec = spec
|
||||||
|
self.loader = adapter(spec)
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return getattr(self.spec, name)
|
||||||
|
|
||||||
|
|
||||||
|
class TraversableResourcesLoader:
|
||||||
|
"""
|
||||||
|
Adapt a loader to provide TraversableResources.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, spec):
|
||||||
|
self.spec = spec
|
||||||
|
|
||||||
|
def get_resource_reader(self, name):
|
||||||
|
return CompatibilityFiles(self.spec)._native()
|
||||||
|
|
||||||
|
|
||||||
|
def _io_wrapper(file, mode='r', *args, **kwargs):
|
||||||
|
if mode == 'r':
|
||||||
|
return TextIOWrapper(file, *args, **kwargs)
|
||||||
|
elif mode == 'rb':
|
||||||
|
return file
|
||||||
|
raise ValueError(
|
||||||
|
"Invalid mode value '{}', only 'r' and 'rb' are supported".format(mode)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CompatibilityFiles:
|
||||||
|
"""
|
||||||
|
Adapter for an existing or non-existant resource reader
|
||||||
|
to provide a compability .files().
|
||||||
|
"""
|
||||||
|
|
||||||
|
class SpecPath(abc.Traversable):
|
||||||
|
"""
|
||||||
|
Path tied to a module spec.
|
||||||
|
Can be read and exposes the resource reader children.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, spec, reader):
|
||||||
|
self._spec = spec
|
||||||
|
self._reader = reader
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
if not self._reader:
|
||||||
|
return iter(())
|
||||||
|
return iter(
|
||||||
|
CompatibilityFiles.ChildPath(self._reader, path)
|
||||||
|
for path in self._reader.contents()
|
||||||
|
)
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
is_dir = is_file
|
||||||
|
|
||||||
|
def joinpath(self, other):
|
||||||
|
if not self._reader:
|
||||||
|
return CompatibilityFiles.OrphanPath(other)
|
||||||
|
return CompatibilityFiles.ChildPath(self._reader, other)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._spec.name
|
||||||
|
|
||||||
|
def open(self, mode='r', *args, **kwargs):
|
||||||
|
return _io_wrapper(self._reader.open_resource(None), mode, *args, **kwargs)
|
||||||
|
|
||||||
|
class ChildPath(abc.Traversable):
|
||||||
|
"""
|
||||||
|
Path tied to a resource reader child.
|
||||||
|
Can be read but doesn't expose any meaningfull children.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, reader, name):
|
||||||
|
self._reader = reader
|
||||||
|
self._name = name
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
return iter(())
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return self._reader.is_resource(self.name)
|
||||||
|
|
||||||
|
def is_dir(self):
|
||||||
|
return not self.is_file()
|
||||||
|
|
||||||
|
def joinpath(self, other):
|
||||||
|
return CompatibilityFiles.OrphanPath(self.name, other)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._name
|
||||||
|
|
||||||
|
def open(self, mode='r', *args, **kwargs):
|
||||||
|
return _io_wrapper(
|
||||||
|
self._reader.open_resource(self.name), mode, *args, **kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
class OrphanPath(abc.Traversable):
|
||||||
|
"""
|
||||||
|
Orphan path, not tied to a module spec or resource reader.
|
||||||
|
Can't be read and doesn't expose any meaningful children.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *path_parts):
|
||||||
|
if len(path_parts) < 1:
|
||||||
|
raise ValueError('Need at least one path part to construct a path')
|
||||||
|
self._path = path_parts
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
return iter(())
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
is_dir = is_file
|
||||||
|
|
||||||
|
def joinpath(self, other):
|
||||||
|
return CompatibilityFiles.OrphanPath(*self._path, other)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._path[-1]
|
||||||
|
|
||||||
|
def open(self, mode='r', *args, **kwargs):
|
||||||
|
raise FileNotFoundError("Can't open orphan path")
|
||||||
|
|
||||||
|
def __init__(self, spec):
|
||||||
|
self.spec = spec
|
||||||
|
|
||||||
|
@property
|
||||||
|
def _reader(self):
|
||||||
|
with suppress(AttributeError):
|
||||||
|
return self.spec.loader.get_resource_reader(self.spec.name)
|
||||||
|
|
||||||
|
def _native(self):
|
||||||
|
"""
|
||||||
|
Return the native reader if it supports files().
|
||||||
|
"""
|
||||||
|
reader = self._reader
|
||||||
|
return reader if hasattr(reader, 'files') else self
|
||||||
|
|
||||||
|
def __getattr__(self, attr):
|
||||||
|
return getattr(self._reader, attr)
|
||||||
|
|
||||||
|
def files(self):
|
||||||
|
return CompatibilityFiles.SpecPath(self.spec, self._reader)
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_spec(package):
|
||||||
|
"""
|
||||||
|
Construct a package spec with traversable compatibility
|
||||||
|
on the spec/loader/reader.
|
||||||
|
"""
|
||||||
|
return SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)
|
118
lib/importlib_resources/_common.py
Normal file
118
lib/importlib_resources/_common.py
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import tempfile
|
||||||
|
import functools
|
||||||
|
import contextlib
|
||||||
|
import types
|
||||||
|
import importlib
|
||||||
|
|
||||||
|
from typing import Union, Any, Optional
|
||||||
|
from .abc import ResourceReader, Traversable
|
||||||
|
|
||||||
|
from ._compat import wrap_spec
|
||||||
|
|
||||||
|
Package = Union[types.ModuleType, str]
|
||||||
|
Resource = Union[str, os.PathLike]
|
||||||
|
|
||||||
|
|
||||||
|
def files(package):
|
||||||
|
# type: (Package) -> Traversable
|
||||||
|
"""
|
||||||
|
Get a Traversable resource from a package
|
||||||
|
"""
|
||||||
|
return from_package(get_package(package))
|
||||||
|
|
||||||
|
|
||||||
|
def normalize_path(path):
|
||||||
|
# type: (Any) -> str
|
||||||
|
"""Normalize a path by ensuring it is a string.
|
||||||
|
|
||||||
|
If the resulting string contains path separators, an exception is raised.
|
||||||
|
"""
|
||||||
|
str_path = str(path)
|
||||||
|
parent, file_name = os.path.split(str_path)
|
||||||
|
if parent:
|
||||||
|
raise ValueError(f'{path!r} must be only a file name')
|
||||||
|
return file_name
|
||||||
|
|
||||||
|
|
||||||
|
def get_resource_reader(package):
|
||||||
|
# type: (types.ModuleType) -> Optional[ResourceReader]
|
||||||
|
"""
|
||||||
|
Return the package's loader if it's a ResourceReader.
|
||||||
|
"""
|
||||||
|
# We can't use
|
||||||
|
# a issubclass() check here because apparently abc.'s __subclasscheck__()
|
||||||
|
# hook wants to create a weak reference to the object, but
|
||||||
|
# zipimport.zipimporter does not support weak references, resulting in a
|
||||||
|
# TypeError. That seems terrible.
|
||||||
|
spec = package.__spec__
|
||||||
|
reader = getattr(spec.loader, 'get_resource_reader', None) # type: ignore
|
||||||
|
if reader is None:
|
||||||
|
return None
|
||||||
|
return reader(spec.name) # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
def resolve(cand):
|
||||||
|
# type: (Package) -> types.ModuleType
|
||||||
|
return cand if isinstance(cand, types.ModuleType) else importlib.import_module(cand)
|
||||||
|
|
||||||
|
|
||||||
|
def get_package(package):
|
||||||
|
# type: (Package) -> types.ModuleType
|
||||||
|
"""Take a package name or module object and return the module.
|
||||||
|
|
||||||
|
Raise an exception if the resolved module is not a package.
|
||||||
|
"""
|
||||||
|
resolved = resolve(package)
|
||||||
|
if wrap_spec(resolved).submodule_search_locations is None:
|
||||||
|
raise TypeError(f'{package!r} is not a package')
|
||||||
|
return resolved
|
||||||
|
|
||||||
|
|
||||||
|
def from_package(package):
|
||||||
|
"""
|
||||||
|
Return a Traversable object for the given package.
|
||||||
|
|
||||||
|
"""
|
||||||
|
spec = wrap_spec(package)
|
||||||
|
reader = spec.loader.get_resource_reader(spec.name)
|
||||||
|
return reader.files()
|
||||||
|
|
||||||
|
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _tempfile(reader, suffix=''):
|
||||||
|
# Not using tempfile.NamedTemporaryFile as it leads to deeper 'try'
|
||||||
|
# blocks due to the need to close the temporary file to work on Windows
|
||||||
|
# properly.
|
||||||
|
fd, raw_path = tempfile.mkstemp(suffix=suffix)
|
||||||
|
try:
|
||||||
|
try:
|
||||||
|
os.write(fd, reader())
|
||||||
|
finally:
|
||||||
|
os.close(fd)
|
||||||
|
del reader
|
||||||
|
yield pathlib.Path(raw_path)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
os.remove(raw_path)
|
||||||
|
except FileNotFoundError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@functools.singledispatch
|
||||||
|
def as_file(path):
|
||||||
|
"""
|
||||||
|
Given a Traversable object, return that object as a
|
||||||
|
path on the local file system in a context manager.
|
||||||
|
"""
|
||||||
|
return _tempfile(path.read_bytes, suffix=path.name)
|
||||||
|
|
||||||
|
|
||||||
|
@as_file.register(pathlib.Path)
|
||||||
|
@contextlib.contextmanager
|
||||||
|
def _(path):
|
||||||
|
"""
|
||||||
|
Degenerate behavior for pathlib.Path objects.
|
||||||
|
"""
|
||||||
|
yield path
|
98
lib/importlib_resources/_compat.py
Normal file
98
lib/importlib_resources/_compat.py
Normal file
|
@ -0,0 +1,98 @@
|
||||||
|
# flake8: noqa
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import sys
|
||||||
|
import pathlib
|
||||||
|
from contextlib import suppress
|
||||||
|
|
||||||
|
if sys.version_info >= (3, 10):
|
||||||
|
from zipfile import Path as ZipPath # type: ignore
|
||||||
|
else:
|
||||||
|
from zipp import Path as ZipPath # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from typing import runtime_checkable # type: ignore
|
||||||
|
except ImportError:
|
||||||
|
|
||||||
|
def runtime_checkable(cls): # type: ignore
|
||||||
|
return cls
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from typing import Protocol # type: ignore
|
||||||
|
except ImportError:
|
||||||
|
Protocol = abc.ABC # type: ignore
|
||||||
|
|
||||||
|
|
||||||
|
class TraversableResourcesLoader:
|
||||||
|
"""
|
||||||
|
Adapt loaders to provide TraversableResources and other
|
||||||
|
compatibility.
|
||||||
|
|
||||||
|
Used primarily for Python 3.9 and earlier where the native
|
||||||
|
loaders do not yet implement TraversableResources.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, spec):
|
||||||
|
self.spec = spec
|
||||||
|
|
||||||
|
@property
|
||||||
|
def path(self):
|
||||||
|
return self.spec.origin
|
||||||
|
|
||||||
|
def get_resource_reader(self, name):
|
||||||
|
from . import readers, _adapters
|
||||||
|
|
||||||
|
def _zip_reader(spec):
|
||||||
|
with suppress(AttributeError):
|
||||||
|
return readers.ZipReader(spec.loader, spec.name)
|
||||||
|
|
||||||
|
def _namespace_reader(spec):
|
||||||
|
with suppress(AttributeError, ValueError):
|
||||||
|
return readers.NamespaceReader(spec.submodule_search_locations)
|
||||||
|
|
||||||
|
def _available_reader(spec):
|
||||||
|
with suppress(AttributeError):
|
||||||
|
return spec.loader.get_resource_reader(spec.name)
|
||||||
|
|
||||||
|
def _native_reader(spec):
|
||||||
|
reader = _available_reader(spec)
|
||||||
|
return reader if hasattr(reader, 'files') else None
|
||||||
|
|
||||||
|
def _file_reader(spec):
|
||||||
|
try:
|
||||||
|
path = pathlib.Path(self.path)
|
||||||
|
except TypeError:
|
||||||
|
return None
|
||||||
|
if path.exists():
|
||||||
|
return readers.FileReader(self)
|
||||||
|
|
||||||
|
return (
|
||||||
|
# native reader if it supplies 'files'
|
||||||
|
_native_reader(self.spec)
|
||||||
|
or
|
||||||
|
# local ZipReader if a zip module
|
||||||
|
_zip_reader(self.spec)
|
||||||
|
or
|
||||||
|
# local NamespaceReader if a namespace module
|
||||||
|
_namespace_reader(self.spec)
|
||||||
|
or
|
||||||
|
# local FileReader
|
||||||
|
_file_reader(self.spec)
|
||||||
|
# fallback - adapt the spec ResourceReader to TraversableReader
|
||||||
|
or _adapters.CompatibilityFiles(self.spec)
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def wrap_spec(package):
|
||||||
|
"""
|
||||||
|
Construct a package spec with traversable compatibility
|
||||||
|
on the spec/loader/reader.
|
||||||
|
|
||||||
|
Supersedes _adapters.wrap_spec to use TraversableResourcesLoader
|
||||||
|
from above for older Python compatibility (<3.10).
|
||||||
|
"""
|
||||||
|
from . import _adapters
|
||||||
|
|
||||||
|
return _adapters.SpecLoaderAdapter(package.__spec__, TraversableResourcesLoader)
|
19
lib/importlib_resources/_itertools.py
Normal file
19
lib/importlib_resources/_itertools.py
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
from itertools import filterfalse
|
||||||
|
|
||||||
|
|
||||||
|
def unique_everseen(iterable, key=None):
|
||||||
|
"List unique elements, preserving order. Remember all elements ever seen."
|
||||||
|
# unique_everseen('AAAABBBCCDAABBB') --> A B C D
|
||||||
|
# unique_everseen('ABBCcAD', str.lower) --> A B C D
|
||||||
|
seen = set()
|
||||||
|
seen_add = seen.add
|
||||||
|
if key is None:
|
||||||
|
for element in filterfalse(seen.__contains__, iterable):
|
||||||
|
seen_add(element)
|
||||||
|
yield element
|
||||||
|
else:
|
||||||
|
for element in iterable:
|
||||||
|
k = key(element)
|
||||||
|
if k not in seen:
|
||||||
|
seen_add(k)
|
||||||
|
yield element
|
84
lib/importlib_resources/_legacy.py
Normal file
84
lib/importlib_resources/_legacy.py
Normal file
|
@ -0,0 +1,84 @@
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import types
|
||||||
|
|
||||||
|
from typing import Union, Iterable, ContextManager, BinaryIO, TextIO
|
||||||
|
|
||||||
|
from . import _common
|
||||||
|
|
||||||
|
Package = Union[types.ModuleType, str]
|
||||||
|
Resource = Union[str, os.PathLike]
|
||||||
|
|
||||||
|
|
||||||
|
def open_binary(package: Package, resource: Resource) -> BinaryIO:
|
||||||
|
"""Return a file-like object opened for binary reading of the resource."""
|
||||||
|
return (_common.files(package) / _common.normalize_path(resource)).open('rb')
|
||||||
|
|
||||||
|
|
||||||
|
def read_binary(package: Package, resource: Resource) -> bytes:
|
||||||
|
"""Return the binary contents of the resource."""
|
||||||
|
return (_common.files(package) / _common.normalize_path(resource)).read_bytes()
|
||||||
|
|
||||||
|
|
||||||
|
def open_text(
|
||||||
|
package: Package,
|
||||||
|
resource: Resource,
|
||||||
|
encoding: str = 'utf-8',
|
||||||
|
errors: str = 'strict',
|
||||||
|
) -> TextIO:
|
||||||
|
"""Return a file-like object opened for text reading of the resource."""
|
||||||
|
return (_common.files(package) / _common.normalize_path(resource)).open(
|
||||||
|
'r', encoding=encoding, errors=errors
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def read_text(
|
||||||
|
package: Package,
|
||||||
|
resource: Resource,
|
||||||
|
encoding: str = 'utf-8',
|
||||||
|
errors: str = 'strict',
|
||||||
|
) -> str:
|
||||||
|
"""Return the decoded string of the resource.
|
||||||
|
|
||||||
|
The decoding-related arguments have the same semantics as those of
|
||||||
|
bytes.decode().
|
||||||
|
"""
|
||||||
|
with open_text(package, resource, encoding, errors) as fp:
|
||||||
|
return fp.read()
|
||||||
|
|
||||||
|
|
||||||
|
def contents(package: Package) -> Iterable[str]:
|
||||||
|
"""Return an iterable of entries in `package`.
|
||||||
|
|
||||||
|
Note that not all entries are resources. Specifically, directories are
|
||||||
|
not considered resources. Use `is_resource()` on each entry returned here
|
||||||
|
to check if it is a resource or not.
|
||||||
|
"""
|
||||||
|
return [path.name for path in _common.files(package).iterdir()]
|
||||||
|
|
||||||
|
|
||||||
|
def is_resource(package: Package, name: str) -> bool:
|
||||||
|
"""True if `name` is a resource inside `package`.
|
||||||
|
|
||||||
|
Directories are *not* resources.
|
||||||
|
"""
|
||||||
|
resource = _common.normalize_path(name)
|
||||||
|
return any(
|
||||||
|
traversable.name == resource and traversable.is_file()
|
||||||
|
for traversable in _common.files(package).iterdir()
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def path(
|
||||||
|
package: Package,
|
||||||
|
resource: Resource,
|
||||||
|
) -> ContextManager[pathlib.Path]:
|
||||||
|
"""A context manager providing a file path object to the resource.
|
||||||
|
|
||||||
|
If the resource does not already exist on its own on the file system,
|
||||||
|
a temporary file will be created. If the file was created, the file
|
||||||
|
will be deleted upon exiting the context manager (no exception is
|
||||||
|
raised if the file was deleted prior to the context manager
|
||||||
|
exiting).
|
||||||
|
"""
|
||||||
|
return _common.as_file(_common.files(package) / _common.normalize_path(resource))
|
137
lib/importlib_resources/abc.py
Normal file
137
lib/importlib_resources/abc.py
Normal file
|
@ -0,0 +1,137 @@
|
||||||
|
import abc
|
||||||
|
from typing import BinaryIO, Iterable, Text
|
||||||
|
|
||||||
|
from ._compat import runtime_checkable, Protocol
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceReader(metaclass=abc.ABCMeta):
|
||||||
|
"""Abstract base class for loaders to provide resource reading support."""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def open_resource(self, resource: Text) -> BinaryIO:
|
||||||
|
"""Return an opened, file-like object for binary reading.
|
||||||
|
|
||||||
|
The 'resource' argument is expected to represent only a file name.
|
||||||
|
If the resource cannot be found, FileNotFoundError is raised.
|
||||||
|
"""
|
||||||
|
# This deliberately raises FileNotFoundError instead of
|
||||||
|
# NotImplementedError so that if this method is accidentally called,
|
||||||
|
# it'll still do the right thing.
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def resource_path(self, resource: Text) -> Text:
|
||||||
|
"""Return the file system path to the specified resource.
|
||||||
|
|
||||||
|
The 'resource' argument is expected to represent only a file name.
|
||||||
|
If the resource does not exist on the file system, raise
|
||||||
|
FileNotFoundError.
|
||||||
|
"""
|
||||||
|
# This deliberately raises FileNotFoundError instead of
|
||||||
|
# NotImplementedError so that if this method is accidentally called,
|
||||||
|
# it'll still do the right thing.
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def is_resource(self, path: Text) -> bool:
|
||||||
|
"""Return True if the named 'path' is a resource.
|
||||||
|
|
||||||
|
Files are resources, directories are not.
|
||||||
|
"""
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def contents(self) -> Iterable[str]:
|
||||||
|
"""Return an iterable of entries in `package`."""
|
||||||
|
raise FileNotFoundError
|
||||||
|
|
||||||
|
|
||||||
|
@runtime_checkable
|
||||||
|
class Traversable(Protocol):
|
||||||
|
"""
|
||||||
|
An object with a subset of pathlib.Path methods suitable for
|
||||||
|
traversing directories and opening files.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def iterdir(self):
|
||||||
|
"""
|
||||||
|
Yield Traversable objects in self
|
||||||
|
"""
|
||||||
|
|
||||||
|
def read_bytes(self):
|
||||||
|
"""
|
||||||
|
Read contents of self as bytes
|
||||||
|
"""
|
||||||
|
with self.open('rb') as strm:
|
||||||
|
return strm.read()
|
||||||
|
|
||||||
|
def read_text(self, encoding=None):
|
||||||
|
"""
|
||||||
|
Read contents of self as text
|
||||||
|
"""
|
||||||
|
with self.open(encoding=encoding) as strm:
|
||||||
|
return strm.read()
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def is_dir(self) -> bool:
|
||||||
|
"""
|
||||||
|
Return True if self is a dir
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def is_file(self) -> bool:
|
||||||
|
"""
|
||||||
|
Return True if self is a file
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def joinpath(self, child):
|
||||||
|
"""
|
||||||
|
Return Traversable child in self
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __truediv__(self, child):
|
||||||
|
"""
|
||||||
|
Return Traversable child in self
|
||||||
|
"""
|
||||||
|
return self.joinpath(child)
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def open(self, mode='r', *args, **kwargs):
|
||||||
|
"""
|
||||||
|
mode may be 'r' or 'rb' to open as text or binary. Return a handle
|
||||||
|
suitable for reading (same as pathlib.Path.open).
|
||||||
|
|
||||||
|
When opening as text, accepts encoding parameters such as those
|
||||||
|
accepted by io.TextIOWrapper.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractproperty
|
||||||
|
def name(self) -> str:
|
||||||
|
"""
|
||||||
|
The base name of this object without any parent references.
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
class TraversableResources(ResourceReader):
|
||||||
|
"""
|
||||||
|
The required interface for providing traversable
|
||||||
|
resources.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def files(self):
|
||||||
|
"""Return a Traversable object for the loaded package."""
|
||||||
|
|
||||||
|
def open_resource(self, resource):
|
||||||
|
return self.files().joinpath(resource).open('rb')
|
||||||
|
|
||||||
|
def resource_path(self, resource):
|
||||||
|
raise FileNotFoundError(resource)
|
||||||
|
|
||||||
|
def is_resource(self, path):
|
||||||
|
return self.files().joinpath(path).is_file()
|
||||||
|
|
||||||
|
def contents(self):
|
||||||
|
return (item.name for item in self.files().iterdir())
|
0
lib/importlib_resources/py.typed
Normal file
0
lib/importlib_resources/py.typed
Normal file
122
lib/importlib_resources/readers.py
Normal file
122
lib/importlib_resources/readers.py
Normal file
|
@ -0,0 +1,122 @@
|
||||||
|
import collections
|
||||||
|
import pathlib
|
||||||
|
import operator
|
||||||
|
|
||||||
|
from . import abc
|
||||||
|
|
||||||
|
from ._itertools import unique_everseen
|
||||||
|
from ._compat import ZipPath
|
||||||
|
|
||||||
|
|
||||||
|
def remove_duplicates(items):
|
||||||
|
return iter(collections.OrderedDict.fromkeys(items))
|
||||||
|
|
||||||
|
|
||||||
|
class FileReader(abc.TraversableResources):
|
||||||
|
def __init__(self, loader):
|
||||||
|
self.path = pathlib.Path(loader.path).parent
|
||||||
|
|
||||||
|
def resource_path(self, resource):
|
||||||
|
"""
|
||||||
|
Return the file system path to prevent
|
||||||
|
`resources.path()` from creating a temporary
|
||||||
|
copy.
|
||||||
|
"""
|
||||||
|
return str(self.path.joinpath(resource))
|
||||||
|
|
||||||
|
def files(self):
|
||||||
|
return self.path
|
||||||
|
|
||||||
|
|
||||||
|
class ZipReader(abc.TraversableResources):
|
||||||
|
def __init__(self, loader, module):
|
||||||
|
_, _, name = module.rpartition('.')
|
||||||
|
self.prefix = loader.prefix.replace('\\', '/') + name + '/'
|
||||||
|
self.archive = loader.archive
|
||||||
|
|
||||||
|
def open_resource(self, resource):
|
||||||
|
try:
|
||||||
|
return super().open_resource(resource)
|
||||||
|
except KeyError as exc:
|
||||||
|
raise FileNotFoundError(exc.args[0])
|
||||||
|
|
||||||
|
def is_resource(self, path):
|
||||||
|
# workaround for `zipfile.Path.is_file` returning true
|
||||||
|
# for non-existent paths.
|
||||||
|
target = self.files().joinpath(path)
|
||||||
|
return target.is_file() and target.exists()
|
||||||
|
|
||||||
|
def files(self):
|
||||||
|
return ZipPath(self.archive, self.prefix)
|
||||||
|
|
||||||
|
|
||||||
|
class MultiplexedPath(abc.Traversable):
|
||||||
|
"""
|
||||||
|
Given a series of Traversable objects, implement a merged
|
||||||
|
version of the interface across all objects. Useful for
|
||||||
|
namespace packages which may be multihomed at a single
|
||||||
|
name.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, *paths):
|
||||||
|
self._paths = list(map(pathlib.Path, remove_duplicates(paths)))
|
||||||
|
if not self._paths:
|
||||||
|
message = 'MultiplexedPath must contain at least one path'
|
||||||
|
raise FileNotFoundError(message)
|
||||||
|
if not all(path.is_dir() for path in self._paths):
|
||||||
|
raise NotADirectoryError('MultiplexedPath only supports directories')
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
files = (file for path in self._paths for file in path.iterdir())
|
||||||
|
return unique_everseen(files, key=operator.attrgetter('name'))
|
||||||
|
|
||||||
|
def read_bytes(self):
|
||||||
|
raise FileNotFoundError(f'{self} is not a file')
|
||||||
|
|
||||||
|
def read_text(self, *args, **kwargs):
|
||||||
|
raise FileNotFoundError(f'{self} is not a file')
|
||||||
|
|
||||||
|
def is_dir(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def joinpath(self, child):
|
||||||
|
# first try to find child in current paths
|
||||||
|
for file in self.iterdir():
|
||||||
|
if file.name == child:
|
||||||
|
return file
|
||||||
|
# if it does not exist, construct it with the first path
|
||||||
|
return self._paths[0] / child
|
||||||
|
|
||||||
|
__truediv__ = joinpath
|
||||||
|
|
||||||
|
def open(self, *args, **kwargs):
|
||||||
|
raise FileNotFoundError(f'{self} is not a file')
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self._paths[0].name
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
paths = ', '.join(f"'{path}'" for path in self._paths)
|
||||||
|
return f'MultiplexedPath({paths})'
|
||||||
|
|
||||||
|
|
||||||
|
class NamespaceReader(abc.TraversableResources):
|
||||||
|
def __init__(self, namespace_path):
|
||||||
|
if 'NamespacePath' not in str(namespace_path):
|
||||||
|
raise ValueError('Invalid path')
|
||||||
|
self.path = MultiplexedPath(*list(namespace_path))
|
||||||
|
|
||||||
|
def resource_path(self, resource):
|
||||||
|
"""
|
||||||
|
Return the file system path to prevent
|
||||||
|
`resources.path()` from creating a temporary
|
||||||
|
copy.
|
||||||
|
"""
|
||||||
|
return str(self.path.joinpath(resource))
|
||||||
|
|
||||||
|
def files(self):
|
||||||
|
return self.path
|
116
lib/importlib_resources/simple.py
Normal file
116
lib/importlib_resources/simple.py
Normal file
|
@ -0,0 +1,116 @@
|
||||||
|
"""
|
||||||
|
Interface adapters for low-level readers.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import abc
|
||||||
|
import io
|
||||||
|
import itertools
|
||||||
|
from typing import BinaryIO, List
|
||||||
|
|
||||||
|
from .abc import Traversable, TraversableResources
|
||||||
|
|
||||||
|
|
||||||
|
class SimpleReader(abc.ABC):
|
||||||
|
"""
|
||||||
|
The minimum, low-level interface required from a resource
|
||||||
|
provider.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractproperty
|
||||||
|
def package(self):
|
||||||
|
# type: () -> str
|
||||||
|
"""
|
||||||
|
The name of the package for which this reader loads resources.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def children(self):
|
||||||
|
# type: () -> List['SimpleReader']
|
||||||
|
"""
|
||||||
|
Obtain an iterable of SimpleReader for available
|
||||||
|
child containers (e.g. directories).
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def resources(self):
|
||||||
|
# type: () -> List[str]
|
||||||
|
"""
|
||||||
|
Obtain available named resources for this virtual package.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def open_binary(self, resource):
|
||||||
|
# type: (str) -> BinaryIO
|
||||||
|
"""
|
||||||
|
Obtain a File-like for a named resource.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@property
|
||||||
|
def name(self):
|
||||||
|
return self.package.split('.')[-1]
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceHandle(Traversable):
|
||||||
|
"""
|
||||||
|
Handle to a named resource in a ResourceReader.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, parent, name):
|
||||||
|
# type: (ResourceContainer, str) -> None
|
||||||
|
self.parent = parent
|
||||||
|
self.name = name # type: ignore
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_dir(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def open(self, mode='r', *args, **kwargs):
|
||||||
|
stream = self.parent.reader.open_binary(self.name)
|
||||||
|
if 'b' not in mode:
|
||||||
|
stream = io.TextIOWrapper(*args, **kwargs)
|
||||||
|
return stream
|
||||||
|
|
||||||
|
def joinpath(self, name):
|
||||||
|
raise RuntimeError("Cannot traverse into a resource")
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceContainer(Traversable):
|
||||||
|
"""
|
||||||
|
Traversable container for a package's resources via its reader.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def __init__(self, reader):
|
||||||
|
# type: (SimpleReader) -> None
|
||||||
|
self.reader = reader
|
||||||
|
|
||||||
|
def is_dir(self):
|
||||||
|
return True
|
||||||
|
|
||||||
|
def is_file(self):
|
||||||
|
return False
|
||||||
|
|
||||||
|
def iterdir(self):
|
||||||
|
files = (ResourceHandle(self, name) for name in self.reader.resources)
|
||||||
|
dirs = map(ResourceContainer, self.reader.children())
|
||||||
|
return itertools.chain(files, dirs)
|
||||||
|
|
||||||
|
def open(self, *args, **kwargs):
|
||||||
|
raise IsADirectoryError()
|
||||||
|
|
||||||
|
def joinpath(self, name):
|
||||||
|
return next(
|
||||||
|
traversable for traversable in self.iterdir() if traversable.name == name
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TraversableReader(TraversableResources, SimpleReader):
|
||||||
|
"""
|
||||||
|
A TraversableResources based on SimpleReader. Resource providers
|
||||||
|
may derive from this class to provide the TraversableResources
|
||||||
|
interface by supplying the SimpleReader interface.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def files(self):
|
||||||
|
return ResourceContainer(self)
|
0
lib/importlib_resources/tests/__init__.py
Normal file
0
lib/importlib_resources/tests/__init__.py
Normal file
19
lib/importlib_resources/tests/_compat.py
Normal file
19
lib/importlib_resources/tests/_compat.py
Normal file
|
@ -0,0 +1,19 @@
|
||||||
|
import os
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
from test.support import import_helper # type: ignore
|
||||||
|
except ImportError:
|
||||||
|
# Python 3.9 and earlier
|
||||||
|
class import_helper: # type: ignore
|
||||||
|
from test.support import modules_setup, modules_cleanup
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Python 3.10
|
||||||
|
from test.support.os_helper import unlink
|
||||||
|
except ImportError:
|
||||||
|
from test.support import unlink as _unlink
|
||||||
|
|
||||||
|
def unlink(target):
|
||||||
|
return _unlink(os.fspath(target))
|
0
lib/importlib_resources/tests/data01/__init__.py
Normal file
0
lib/importlib_resources/tests/data01/__init__.py
Normal file
BIN
lib/importlib_resources/tests/data01/binary.file
Normal file
BIN
lib/importlib_resources/tests/data01/binary.file
Normal file
Binary file not shown.
BIN
lib/importlib_resources/tests/data01/subdirectory/binary.file
Normal file
BIN
lib/importlib_resources/tests/data01/subdirectory/binary.file
Normal file
Binary file not shown.
BIN
lib/importlib_resources/tests/data01/utf-16.file
Normal file
BIN
lib/importlib_resources/tests/data01/utf-16.file
Normal file
Binary file not shown.
1
lib/importlib_resources/tests/data01/utf-8.file
Normal file
1
lib/importlib_resources/tests/data01/utf-8.file
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Hello, UTF-8 world!
|
0
lib/importlib_resources/tests/data02/__init__.py
Normal file
0
lib/importlib_resources/tests/data02/__init__.py
Normal file
0
lib/importlib_resources/tests/data02/one/__init__.py
Normal file
0
lib/importlib_resources/tests/data02/one/__init__.py
Normal file
1
lib/importlib_resources/tests/data02/one/resource1.txt
Normal file
1
lib/importlib_resources/tests/data02/one/resource1.txt
Normal file
|
@ -0,0 +1 @@
|
||||||
|
one resource
|
0
lib/importlib_resources/tests/data02/two/__init__.py
Normal file
0
lib/importlib_resources/tests/data02/two/__init__.py
Normal file
1
lib/importlib_resources/tests/data02/two/resource2.txt
Normal file
1
lib/importlib_resources/tests/data02/two/resource2.txt
Normal file
|
@ -0,0 +1 @@
|
||||||
|
two resource
|
BIN
lib/importlib_resources/tests/namespacedata01/binary.file
Normal file
BIN
lib/importlib_resources/tests/namespacedata01/binary.file
Normal file
Binary file not shown.
BIN
lib/importlib_resources/tests/namespacedata01/utf-16.file
Normal file
BIN
lib/importlib_resources/tests/namespacedata01/utf-16.file
Normal file
Binary file not shown.
1
lib/importlib_resources/tests/namespacedata01/utf-8.file
Normal file
1
lib/importlib_resources/tests/namespacedata01/utf-8.file
Normal file
|
@ -0,0 +1 @@
|
||||||
|
Hello, UTF-8 world!
|
102
lib/importlib_resources/tests/test_compatibilty_files.py
Normal file
102
lib/importlib_resources/tests/test_compatibilty_files.py
Normal file
|
@ -0,0 +1,102 @@
|
||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import importlib_resources as resources
|
||||||
|
|
||||||
|
from importlib_resources._adapters import (
|
||||||
|
CompatibilityFiles,
|
||||||
|
wrap_spec,
|
||||||
|
)
|
||||||
|
|
||||||
|
from . import util
|
||||||
|
|
||||||
|
|
||||||
|
class CompatibilityFilesTests(unittest.TestCase):
|
||||||
|
@property
|
||||||
|
def package(self):
|
||||||
|
bytes_data = io.BytesIO(b'Hello, world!')
|
||||||
|
return util.create_package(
|
||||||
|
file=bytes_data,
|
||||||
|
path='some_path',
|
||||||
|
contents=('a', 'b', 'c'),
|
||||||
|
)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def files(self):
|
||||||
|
return resources.files(self.package)
|
||||||
|
|
||||||
|
def test_spec_path_iter(self):
|
||||||
|
self.assertEqual(
|
||||||
|
sorted(path.name for path in self.files.iterdir()),
|
||||||
|
['a', 'b', 'c'],
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_child_path_iter(self):
|
||||||
|
self.assertEqual(list((self.files / 'a').iterdir()), [])
|
||||||
|
|
||||||
|
def test_orphan_path_iter(self):
|
||||||
|
self.assertEqual(list((self.files / 'a' / 'a').iterdir()), [])
|
||||||
|
self.assertEqual(list((self.files / 'a' / 'a' / 'a').iterdir()), [])
|
||||||
|
|
||||||
|
def test_spec_path_is(self):
|
||||||
|
self.assertFalse(self.files.is_file())
|
||||||
|
self.assertFalse(self.files.is_dir())
|
||||||
|
|
||||||
|
def test_child_path_is(self):
|
||||||
|
self.assertTrue((self.files / 'a').is_file())
|
||||||
|
self.assertFalse((self.files / 'a').is_dir())
|
||||||
|
|
||||||
|
def test_orphan_path_is(self):
|
||||||
|
self.assertFalse((self.files / 'a' / 'a').is_file())
|
||||||
|
self.assertFalse((self.files / 'a' / 'a').is_dir())
|
||||||
|
self.assertFalse((self.files / 'a' / 'a' / 'a').is_file())
|
||||||
|
self.assertFalse((self.files / 'a' / 'a' / 'a').is_dir())
|
||||||
|
|
||||||
|
def test_spec_path_name(self):
|
||||||
|
self.assertEqual(self.files.name, 'testingpackage')
|
||||||
|
|
||||||
|
def test_child_path_name(self):
|
||||||
|
self.assertEqual((self.files / 'a').name, 'a')
|
||||||
|
|
||||||
|
def test_orphan_path_name(self):
|
||||||
|
self.assertEqual((self.files / 'a' / 'b').name, 'b')
|
||||||
|
self.assertEqual((self.files / 'a' / 'b' / 'c').name, 'c')
|
||||||
|
|
||||||
|
def test_spec_path_open(self):
|
||||||
|
self.assertEqual(self.files.read_bytes(), b'Hello, world!')
|
||||||
|
self.assertEqual(self.files.read_text(), 'Hello, world!')
|
||||||
|
|
||||||
|
def test_child_path_open(self):
|
||||||
|
self.assertEqual((self.files / 'a').read_bytes(), b'Hello, world!')
|
||||||
|
self.assertEqual((self.files / 'a').read_text(), 'Hello, world!')
|
||||||
|
|
||||||
|
def test_orphan_path_open(self):
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
(self.files / 'a' / 'b').read_bytes()
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
(self.files / 'a' / 'b' / 'c').read_bytes()
|
||||||
|
|
||||||
|
def test_open_invalid_mode(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.files.open('0')
|
||||||
|
|
||||||
|
def test_orphan_path_invalid(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
CompatibilityFiles.OrphanPath()
|
||||||
|
|
||||||
|
def test_wrap_spec(self):
|
||||||
|
spec = wrap_spec(self.package)
|
||||||
|
self.assertIsInstance(spec.loader.get_resource_reader(None), CompatibilityFiles)
|
||||||
|
|
||||||
|
|
||||||
|
class CompatibilityFilesNoReaderTests(unittest.TestCase):
|
||||||
|
@property
|
||||||
|
def package(self):
|
||||||
|
return util.create_package_from_loader(None)
|
||||||
|
|
||||||
|
@property
|
||||||
|
def files(self):
|
||||||
|
return resources.files(self.package)
|
||||||
|
|
||||||
|
def test_spec_path_joinpath(self):
|
||||||
|
self.assertIsInstance(self.files / 'a', CompatibilityFiles.OrphanPath)
|
42
lib/importlib_resources/tests/test_contents.py
Normal file
42
lib/importlib_resources/tests/test_contents.py
Normal file
|
@ -0,0 +1,42 @@
|
||||||
|
import unittest
|
||||||
|
import importlib_resources as resources
|
||||||
|
|
||||||
|
from . import data01
|
||||||
|
from . import util
|
||||||
|
|
||||||
|
|
||||||
|
class ContentsTests:
|
||||||
|
expected = {
|
||||||
|
'__init__.py',
|
||||||
|
'binary.file',
|
||||||
|
'subdirectory',
|
||||||
|
'utf-16.file',
|
||||||
|
'utf-8.file',
|
||||||
|
}
|
||||||
|
|
||||||
|
def test_contents(self):
|
||||||
|
assert self.expected <= set(resources.contents(self.data))
|
||||||
|
|
||||||
|
|
||||||
|
class ContentsDiskTests(ContentsTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.data = data01
|
||||||
|
|
||||||
|
|
||||||
|
class ContentsZipTests(ContentsTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ContentsNamespaceTests(ContentsTests, unittest.TestCase):
|
||||||
|
expected = {
|
||||||
|
# no __init__ because of namespace design
|
||||||
|
# no subdirectory as incidental difference in fixture
|
||||||
|
'binary.file',
|
||||||
|
'utf-16.file',
|
||||||
|
'utf-8.file',
|
||||||
|
}
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
from . import namespacedata01
|
||||||
|
|
||||||
|
self.data = namespacedata01
|
46
lib/importlib_resources/tests/test_files.py
Normal file
46
lib/importlib_resources/tests/test_files.py
Normal file
|
@ -0,0 +1,46 @@
|
||||||
|
import typing
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import importlib_resources as resources
|
||||||
|
from importlib_resources.abc import Traversable
|
||||||
|
from . import data01
|
||||||
|
from . import util
|
||||||
|
|
||||||
|
|
||||||
|
class FilesTests:
|
||||||
|
def test_read_bytes(self):
|
||||||
|
files = resources.files(self.data)
|
||||||
|
actual = files.joinpath('utf-8.file').read_bytes()
|
||||||
|
assert actual == b'Hello, UTF-8 world!\n'
|
||||||
|
|
||||||
|
def test_read_text(self):
|
||||||
|
files = resources.files(self.data)
|
||||||
|
actual = files.joinpath('utf-8.file').read_text(encoding='utf-8')
|
||||||
|
assert actual == 'Hello, UTF-8 world!\n'
|
||||||
|
|
||||||
|
@unittest.skipUnless(
|
||||||
|
hasattr(typing, 'runtime_checkable'),
|
||||||
|
"Only suitable when typing supports runtime_checkable",
|
||||||
|
)
|
||||||
|
def test_traversable(self):
|
||||||
|
assert isinstance(resources.files(self.data), Traversable)
|
||||||
|
|
||||||
|
|
||||||
|
class OpenDiskTests(FilesTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.data = data01
|
||||||
|
|
||||||
|
|
||||||
|
class OpenZipTests(FilesTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class OpenNamespaceTests(FilesTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
from . import namespacedata01
|
||||||
|
|
||||||
|
self.data = namespacedata01
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
77
lib/importlib_resources/tests/test_open.py
Normal file
77
lib/importlib_resources/tests/test_open.py
Normal file
|
@ -0,0 +1,77 @@
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import importlib_resources as resources
|
||||||
|
from . import data01
|
||||||
|
from . import util
|
||||||
|
|
||||||
|
|
||||||
|
class CommonBinaryTests(util.CommonTests, unittest.TestCase):
|
||||||
|
def execute(self, package, path):
|
||||||
|
with resources.open_binary(package, path):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class CommonTextTests(util.CommonTests, unittest.TestCase):
|
||||||
|
def execute(self, package, path):
|
||||||
|
with resources.open_text(package, path):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class OpenTests:
|
||||||
|
def test_open_binary(self):
|
||||||
|
with resources.open_binary(self.data, 'utf-8.file') as fp:
|
||||||
|
result = fp.read()
|
||||||
|
self.assertEqual(result, b'Hello, UTF-8 world!\n')
|
||||||
|
|
||||||
|
def test_open_text_default_encoding(self):
|
||||||
|
with resources.open_text(self.data, 'utf-8.file') as fp:
|
||||||
|
result = fp.read()
|
||||||
|
self.assertEqual(result, 'Hello, UTF-8 world!\n')
|
||||||
|
|
||||||
|
def test_open_text_given_encoding(self):
|
||||||
|
with resources.open_text(self.data, 'utf-16.file', 'utf-16', 'strict') as fp:
|
||||||
|
result = fp.read()
|
||||||
|
self.assertEqual(result, 'Hello, UTF-16 world!\n')
|
||||||
|
|
||||||
|
def test_open_text_with_errors(self):
|
||||||
|
# Raises UnicodeError without the 'errors' argument.
|
||||||
|
with resources.open_text(self.data, 'utf-16.file', 'utf-8', 'strict') as fp:
|
||||||
|
self.assertRaises(UnicodeError, fp.read)
|
||||||
|
with resources.open_text(self.data, 'utf-16.file', 'utf-8', 'ignore') as fp:
|
||||||
|
result = fp.read()
|
||||||
|
self.assertEqual(
|
||||||
|
result,
|
||||||
|
'H\x00e\x00l\x00l\x00o\x00,\x00 '
|
||||||
|
'\x00U\x00T\x00F\x00-\x001\x006\x00 '
|
||||||
|
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00',
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_open_binary_FileNotFoundError(self):
|
||||||
|
self.assertRaises(
|
||||||
|
FileNotFoundError, resources.open_binary, self.data, 'does-not-exist'
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_open_text_FileNotFoundError(self):
|
||||||
|
self.assertRaises(
|
||||||
|
FileNotFoundError, resources.open_text, self.data, 'does-not-exist'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class OpenDiskTests(OpenTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.data = data01
|
||||||
|
|
||||||
|
|
||||||
|
class OpenDiskNamespaceTests(OpenTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
from . import namespacedata01
|
||||||
|
|
||||||
|
self.data = namespacedata01
|
||||||
|
|
||||||
|
|
||||||
|
class OpenZipTests(OpenTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
61
lib/importlib_resources/tests/test_path.py
Normal file
61
lib/importlib_resources/tests/test_path.py
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
import io
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
import importlib_resources as resources
|
||||||
|
from . import data01
|
||||||
|
from . import util
|
||||||
|
|
||||||
|
|
||||||
|
class CommonTests(util.CommonTests, unittest.TestCase):
|
||||||
|
def execute(self, package, path):
|
||||||
|
with resources.path(package, path):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class PathTests:
|
||||||
|
def test_reading(self):
|
||||||
|
# Path should be readable.
|
||||||
|
# Test also implicitly verifies the returned object is a pathlib.Path
|
||||||
|
# instance.
|
||||||
|
with resources.path(self.data, 'utf-8.file') as path:
|
||||||
|
self.assertTrue(path.name.endswith("utf-8.file"), repr(path))
|
||||||
|
# pathlib.Path.read_text() was introduced in Python 3.5.
|
||||||
|
with path.open('r', encoding='utf-8') as file:
|
||||||
|
text = file.read()
|
||||||
|
self.assertEqual('Hello, UTF-8 world!\n', text)
|
||||||
|
|
||||||
|
|
||||||
|
class PathDiskTests(PathTests, unittest.TestCase):
|
||||||
|
data = data01
|
||||||
|
|
||||||
|
def test_natural_path(self):
|
||||||
|
"""
|
||||||
|
Guarantee the internal implementation detail that
|
||||||
|
file-system-backed resources do not get the tempdir
|
||||||
|
treatment.
|
||||||
|
"""
|
||||||
|
with resources.path(self.data, 'utf-8.file') as path:
|
||||||
|
assert 'data' in str(path)
|
||||||
|
|
||||||
|
|
||||||
|
class PathMemoryTests(PathTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
file = io.BytesIO(b'Hello, UTF-8 world!\n')
|
||||||
|
self.addCleanup(file.close)
|
||||||
|
self.data = util.create_package(
|
||||||
|
file=file, path=FileNotFoundError("package exists only in memory")
|
||||||
|
)
|
||||||
|
self.data.__spec__.origin = None
|
||||||
|
self.data.__spec__.has_location = False
|
||||||
|
|
||||||
|
|
||||||
|
class PathZipTests(PathTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
def test_remove_in_context_manager(self):
|
||||||
|
# It is not an error if the file that was temporarily stashed on the
|
||||||
|
# file system is removed inside the `with` stanza.
|
||||||
|
with resources.path(self.data, 'utf-8.file') as path:
|
||||||
|
path.unlink()
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
67
lib/importlib_resources/tests/test_read.py
Normal file
67
lib/importlib_resources/tests/test_read.py
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
import unittest
|
||||||
|
import importlib_resources as resources
|
||||||
|
|
||||||
|
from . import data01
|
||||||
|
from . import util
|
||||||
|
from importlib import import_module
|
||||||
|
|
||||||
|
|
||||||
|
class CommonBinaryTests(util.CommonTests, unittest.TestCase):
|
||||||
|
def execute(self, package, path):
|
||||||
|
resources.read_binary(package, path)
|
||||||
|
|
||||||
|
|
||||||
|
class CommonTextTests(util.CommonTests, unittest.TestCase):
|
||||||
|
def execute(self, package, path):
|
||||||
|
resources.read_text(package, path)
|
||||||
|
|
||||||
|
|
||||||
|
class ReadTests:
|
||||||
|
def test_read_binary(self):
|
||||||
|
result = resources.read_binary(self.data, 'binary.file')
|
||||||
|
self.assertEqual(result, b'\0\1\2\3')
|
||||||
|
|
||||||
|
def test_read_text_default_encoding(self):
|
||||||
|
result = resources.read_text(self.data, 'utf-8.file')
|
||||||
|
self.assertEqual(result, 'Hello, UTF-8 world!\n')
|
||||||
|
|
||||||
|
def test_read_text_given_encoding(self):
|
||||||
|
result = resources.read_text(self.data, 'utf-16.file', encoding='utf-16')
|
||||||
|
self.assertEqual(result, 'Hello, UTF-16 world!\n')
|
||||||
|
|
||||||
|
def test_read_text_with_errors(self):
|
||||||
|
# Raises UnicodeError without the 'errors' argument.
|
||||||
|
self.assertRaises(UnicodeError, resources.read_text, self.data, 'utf-16.file')
|
||||||
|
result = resources.read_text(self.data, 'utf-16.file', errors='ignore')
|
||||||
|
self.assertEqual(
|
||||||
|
result,
|
||||||
|
'H\x00e\x00l\x00l\x00o\x00,\x00 '
|
||||||
|
'\x00U\x00T\x00F\x00-\x001\x006\x00 '
|
||||||
|
'\x00w\x00o\x00r\x00l\x00d\x00!\x00\n\x00',
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ReadDiskTests(ReadTests, unittest.TestCase):
|
||||||
|
data = data01
|
||||||
|
|
||||||
|
|
||||||
|
class ReadZipTests(ReadTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
def test_read_submodule_resource(self):
|
||||||
|
submodule = import_module('ziptestdata.subdirectory')
|
||||||
|
result = resources.read_binary(submodule, 'binary.file')
|
||||||
|
self.assertEqual(result, b'\0\1\2\3')
|
||||||
|
|
||||||
|
def test_read_submodule_resource_by_name(self):
|
||||||
|
result = resources.read_binary('ziptestdata.subdirectory', 'binary.file')
|
||||||
|
self.assertEqual(result, b'\0\1\2\3')
|
||||||
|
|
||||||
|
|
||||||
|
class ReadNamespaceTests(ReadTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
from . import namespacedata01
|
||||||
|
|
||||||
|
self.data = namespacedata01
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
128
lib/importlib_resources/tests/test_reader.py
Normal file
128
lib/importlib_resources/tests/test_reader.py
Normal file
|
@ -0,0 +1,128 @@
|
||||||
|
import os.path
|
||||||
|
import sys
|
||||||
|
import pathlib
|
||||||
|
import unittest
|
||||||
|
|
||||||
|
from importlib import import_module
|
||||||
|
from importlib_resources.readers import MultiplexedPath, NamespaceReader
|
||||||
|
|
||||||
|
|
||||||
|
class MultiplexedPathTest(unittest.TestCase):
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
path = pathlib.Path(__file__).parent / 'namespacedata01'
|
||||||
|
cls.folder = str(path)
|
||||||
|
|
||||||
|
def test_init_no_paths(self):
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
MultiplexedPath()
|
||||||
|
|
||||||
|
def test_init_file(self):
|
||||||
|
with self.assertRaises(NotADirectoryError):
|
||||||
|
MultiplexedPath(os.path.join(self.folder, 'binary.file'))
|
||||||
|
|
||||||
|
def test_iterdir(self):
|
||||||
|
contents = {path.name for path in MultiplexedPath(self.folder).iterdir()}
|
||||||
|
try:
|
||||||
|
contents.remove('__pycache__')
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
pass
|
||||||
|
self.assertEqual(contents, {'binary.file', 'utf-16.file', 'utf-8.file'})
|
||||||
|
|
||||||
|
def test_iterdir_duplicate(self):
|
||||||
|
data01 = os.path.abspath(os.path.join(__file__, '..', 'data01'))
|
||||||
|
contents = {
|
||||||
|
path.name for path in MultiplexedPath(self.folder, data01).iterdir()
|
||||||
|
}
|
||||||
|
for remove in ('__pycache__', '__init__.pyc'):
|
||||||
|
try:
|
||||||
|
contents.remove(remove)
|
||||||
|
except (KeyError, ValueError):
|
||||||
|
pass
|
||||||
|
self.assertEqual(
|
||||||
|
contents,
|
||||||
|
{'__init__.py', 'binary.file', 'subdirectory', 'utf-16.file', 'utf-8.file'},
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_is_dir(self):
|
||||||
|
self.assertEqual(MultiplexedPath(self.folder).is_dir(), True)
|
||||||
|
|
||||||
|
def test_is_file(self):
|
||||||
|
self.assertEqual(MultiplexedPath(self.folder).is_file(), False)
|
||||||
|
|
||||||
|
def test_open_file(self):
|
||||||
|
path = MultiplexedPath(self.folder)
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
path.read_bytes()
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
path.read_text()
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
path.open()
|
||||||
|
|
||||||
|
def test_join_path(self):
|
||||||
|
prefix = os.path.abspath(os.path.join(__file__, '..'))
|
||||||
|
data01 = os.path.join(prefix, 'data01')
|
||||||
|
path = MultiplexedPath(self.folder, data01)
|
||||||
|
self.assertEqual(
|
||||||
|
str(path.joinpath('binary.file'))[len(prefix) + 1 :],
|
||||||
|
os.path.join('namespacedata01', 'binary.file'),
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
str(path.joinpath('subdirectory'))[len(prefix) + 1 :],
|
||||||
|
os.path.join('data01', 'subdirectory'),
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
str(path.joinpath('imaginary'))[len(prefix) + 1 :],
|
||||||
|
os.path.join('namespacedata01', 'imaginary'),
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_repr(self):
|
||||||
|
self.assertEqual(
|
||||||
|
repr(MultiplexedPath(self.folder)),
|
||||||
|
f"MultiplexedPath('{self.folder}')",
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_name(self):
|
||||||
|
self.assertEqual(
|
||||||
|
MultiplexedPath(self.folder).name,
|
||||||
|
os.path.basename(self.folder),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class NamespaceReaderTest(unittest.TestCase):
|
||||||
|
site_dir = str(pathlib.Path(__file__).parent)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
sys.path.append(cls.site_dir)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
sys.path.remove(cls.site_dir)
|
||||||
|
|
||||||
|
def test_init_error(self):
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
NamespaceReader(['path1', 'path2'])
|
||||||
|
|
||||||
|
def test_resource_path(self):
|
||||||
|
namespacedata01 = import_module('namespacedata01')
|
||||||
|
reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations)
|
||||||
|
|
||||||
|
root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01'))
|
||||||
|
self.assertEqual(
|
||||||
|
reader.resource_path('binary.file'), os.path.join(root, 'binary.file')
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
reader.resource_path('imaginary'), os.path.join(root, 'imaginary')
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_files(self):
|
||||||
|
namespacedata01 = import_module('namespacedata01')
|
||||||
|
reader = NamespaceReader(namespacedata01.__spec__.submodule_search_locations)
|
||||||
|
root = os.path.abspath(os.path.join(__file__, '..', 'namespacedata01'))
|
||||||
|
self.assertIsInstance(reader.files(), MultiplexedPath)
|
||||||
|
self.assertEqual(repr(reader.files()), f"MultiplexedPath('{root}')")
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
254
lib/importlib_resources/tests/test_resource.py
Normal file
254
lib/importlib_resources/tests/test_resource.py
Normal file
|
@ -0,0 +1,254 @@
|
||||||
|
import sys
|
||||||
|
import unittest
|
||||||
|
import importlib_resources as resources
|
||||||
|
import uuid
|
||||||
|
import pathlib
|
||||||
|
|
||||||
|
from . import data01
|
||||||
|
from . import zipdata01, zipdata02
|
||||||
|
from . import util
|
||||||
|
from importlib import import_module
|
||||||
|
from ._compat import import_helper, unlink
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceTests:
|
||||||
|
# Subclasses are expected to set the `data` attribute.
|
||||||
|
|
||||||
|
def test_is_resource_good_path(self):
|
||||||
|
self.assertTrue(resources.is_resource(self.data, 'binary.file'))
|
||||||
|
|
||||||
|
def test_is_resource_missing(self):
|
||||||
|
self.assertFalse(resources.is_resource(self.data, 'not-a-file'))
|
||||||
|
|
||||||
|
def test_is_resource_subresource_directory(self):
|
||||||
|
# Directories are not resources.
|
||||||
|
self.assertFalse(resources.is_resource(self.data, 'subdirectory'))
|
||||||
|
|
||||||
|
def test_contents(self):
|
||||||
|
contents = set(resources.contents(self.data))
|
||||||
|
# There may be cruft in the directory listing of the data directory.
|
||||||
|
# It could have a __pycache__ directory,
|
||||||
|
# an artifact of the
|
||||||
|
# test suite importing these modules, which
|
||||||
|
# are not germane to this test, so just filter them out.
|
||||||
|
contents.discard('__pycache__')
|
||||||
|
self.assertEqual(
|
||||||
|
sorted(contents),
|
||||||
|
[
|
||||||
|
'__init__.py',
|
||||||
|
'binary.file',
|
||||||
|
'subdirectory',
|
||||||
|
'utf-16.file',
|
||||||
|
'utf-8.file',
|
||||||
|
],
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceDiskTests(ResourceTests, unittest.TestCase):
|
||||||
|
def setUp(self):
|
||||||
|
self.data = data01
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceZipTests(ResourceTests, util.ZipSetup, unittest.TestCase):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceLoaderTests(unittest.TestCase):
|
||||||
|
def test_resource_contents(self):
|
||||||
|
package = util.create_package(
|
||||||
|
file=data01, path=data01.__file__, contents=['A', 'B', 'C']
|
||||||
|
)
|
||||||
|
self.assertEqual(set(resources.contents(package)), {'A', 'B', 'C'})
|
||||||
|
|
||||||
|
def test_resource_is_resource(self):
|
||||||
|
package = util.create_package(
|
||||||
|
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
|
||||||
|
)
|
||||||
|
self.assertTrue(resources.is_resource(package, 'B'))
|
||||||
|
|
||||||
|
def test_resource_directory_is_not_resource(self):
|
||||||
|
package = util.create_package(
|
||||||
|
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
|
||||||
|
)
|
||||||
|
self.assertFalse(resources.is_resource(package, 'D'))
|
||||||
|
|
||||||
|
def test_resource_missing_is_not_resource(self):
|
||||||
|
package = util.create_package(
|
||||||
|
file=data01, path=data01.__file__, contents=['A', 'B', 'C', 'D/E', 'D/F']
|
||||||
|
)
|
||||||
|
self.assertFalse(resources.is_resource(package, 'Z'))
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceCornerCaseTests(unittest.TestCase):
|
||||||
|
def test_package_has_no_reader_fallback(self):
|
||||||
|
# Test odd ball packages which:
|
||||||
|
# 1. Do not have a ResourceReader as a loader
|
||||||
|
# 2. Are not on the file system
|
||||||
|
# 3. Are not in a zip file
|
||||||
|
module = util.create_package(
|
||||||
|
file=data01, path=data01.__file__, contents=['A', 'B', 'C']
|
||||||
|
)
|
||||||
|
# Give the module a dummy loader.
|
||||||
|
module.__loader__ = object()
|
||||||
|
# Give the module a dummy origin.
|
||||||
|
module.__file__ = '/path/which/shall/not/be/named'
|
||||||
|
module.__spec__.loader = module.__loader__
|
||||||
|
module.__spec__.origin = module.__file__
|
||||||
|
self.assertFalse(resources.is_resource(module, 'A'))
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceFromZipsTest01(util.ZipSetupBase, unittest.TestCase):
|
||||||
|
ZIP_MODULE = zipdata01 # type: ignore
|
||||||
|
|
||||||
|
def test_is_submodule_resource(self):
|
||||||
|
submodule = import_module('ziptestdata.subdirectory')
|
||||||
|
self.assertTrue(resources.is_resource(submodule, 'binary.file'))
|
||||||
|
|
||||||
|
def test_read_submodule_resource_by_name(self):
|
||||||
|
self.assertTrue(
|
||||||
|
resources.is_resource('ziptestdata.subdirectory', 'binary.file')
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_submodule_contents(self):
|
||||||
|
submodule = import_module('ziptestdata.subdirectory')
|
||||||
|
self.assertEqual(
|
||||||
|
set(resources.contents(submodule)), {'__init__.py', 'binary.file'}
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_submodule_contents_by_name(self):
|
||||||
|
self.assertEqual(
|
||||||
|
set(resources.contents('ziptestdata.subdirectory')),
|
||||||
|
{'__init__.py', 'binary.file'},
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceFromZipsTest02(util.ZipSetupBase, unittest.TestCase):
|
||||||
|
ZIP_MODULE = zipdata02 # type: ignore
|
||||||
|
|
||||||
|
def test_unrelated_contents(self):
|
||||||
|
"""
|
||||||
|
Test thata zip with two unrelated subpackages return
|
||||||
|
distinct resources. Ref python/importlib_resources#44.
|
||||||
|
"""
|
||||||
|
self.assertEqual(
|
||||||
|
set(resources.contents('ziptestdata.one')), {'__init__.py', 'resource1.txt'}
|
||||||
|
)
|
||||||
|
self.assertEqual(
|
||||||
|
set(resources.contents('ziptestdata.two')), {'__init__.py', 'resource2.txt'}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class DeletingZipsTest(unittest.TestCase):
|
||||||
|
"""Having accessed resources in a zip file should not keep an open
|
||||||
|
reference to the zip.
|
||||||
|
"""
|
||||||
|
|
||||||
|
ZIP_MODULE = zipdata01
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
modules = import_helper.modules_setup()
|
||||||
|
self.addCleanup(import_helper.modules_cleanup, *modules)
|
||||||
|
|
||||||
|
data_path = pathlib.Path(self.ZIP_MODULE.__file__)
|
||||||
|
data_dir = data_path.parent
|
||||||
|
self.source_zip_path = data_dir / 'ziptestdata.zip'
|
||||||
|
self.zip_path = pathlib.Path(f'{uuid.uuid4()}.zip').absolute()
|
||||||
|
self.zip_path.write_bytes(self.source_zip_path.read_bytes())
|
||||||
|
sys.path.append(str(self.zip_path))
|
||||||
|
self.data = import_module('ziptestdata')
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
try:
|
||||||
|
sys.path.remove(str(self.zip_path))
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
del sys.path_importer_cache[str(self.zip_path)]
|
||||||
|
del sys.modules[self.data.__name__]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
unlink(self.zip_path)
|
||||||
|
except OSError:
|
||||||
|
# If the test fails, this will probably fail too
|
||||||
|
pass
|
||||||
|
|
||||||
|
def test_contents_does_not_keep_open(self):
|
||||||
|
c = resources.contents('ziptestdata')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
def test_is_resource_does_not_keep_open(self):
|
||||||
|
c = resources.is_resource('ziptestdata', 'binary.file')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
def test_is_resource_failure_does_not_keep_open(self):
|
||||||
|
c = resources.is_resource('ziptestdata', 'not-present')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
@unittest.skip("Desired but not supported.")
|
||||||
|
def test_path_does_not_keep_open(self):
|
||||||
|
c = resources.path('ziptestdata', 'binary.file')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
def test_entered_path_does_not_keep_open(self):
|
||||||
|
# This is what certifi does on import to make its bundle
|
||||||
|
# available for the process duration.
|
||||||
|
c = resources.path('ziptestdata', 'binary.file').__enter__()
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
def test_read_binary_does_not_keep_open(self):
|
||||||
|
c = resources.read_binary('ziptestdata', 'binary.file')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
def test_read_text_does_not_keep_open(self):
|
||||||
|
c = resources.read_text('ziptestdata', 'utf-8.file', encoding='utf-8')
|
||||||
|
self.zip_path.unlink()
|
||||||
|
del c
|
||||||
|
|
||||||
|
|
||||||
|
class ResourceFromNamespaceTest01(unittest.TestCase):
|
||||||
|
site_dir = str(pathlib.Path(__file__).parent)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
sys.path.append(cls.site_dir)
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
sys.path.remove(cls.site_dir)
|
||||||
|
|
||||||
|
def test_is_submodule_resource(self):
|
||||||
|
self.assertTrue(
|
||||||
|
resources.is_resource(import_module('namespacedata01'), 'binary.file')
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_read_submodule_resource_by_name(self):
|
||||||
|
self.assertTrue(resources.is_resource('namespacedata01', 'binary.file'))
|
||||||
|
|
||||||
|
def test_submodule_contents(self):
|
||||||
|
contents = set(resources.contents(import_module('namespacedata01')))
|
||||||
|
try:
|
||||||
|
contents.remove('__pycache__')
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'})
|
||||||
|
|
||||||
|
def test_submodule_contents_by_name(self):
|
||||||
|
contents = set(resources.contents('namespacedata01'))
|
||||||
|
try:
|
||||||
|
contents.remove('__pycache__')
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
self.assertEqual(contents, {'binary.file', 'utf-8.file', 'utf-16.file'})
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
53
lib/importlib_resources/tests/update-zips.py
Normal file
53
lib/importlib_resources/tests/update-zips.py
Normal file
|
@ -0,0 +1,53 @@
|
||||||
|
"""
|
||||||
|
Generate the zip test data files.
|
||||||
|
|
||||||
|
Run to build the tests/zipdataNN/ziptestdata.zip files from
|
||||||
|
files in tests/dataNN.
|
||||||
|
|
||||||
|
Replaces the file with the working copy, but does commit anything
|
||||||
|
to the source repo.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import contextlib
|
||||||
|
import os
|
||||||
|
import pathlib
|
||||||
|
import zipfile
|
||||||
|
|
||||||
|
|
||||||
|
def main():
|
||||||
|
"""
|
||||||
|
>>> from unittest import mock
|
||||||
|
>>> monkeypatch = getfixture('monkeypatch')
|
||||||
|
>>> monkeypatch.setattr(zipfile, 'ZipFile', mock.MagicMock())
|
||||||
|
>>> print(); main() # print workaround for bpo-32509
|
||||||
|
<BLANKLINE>
|
||||||
|
...data01... -> ziptestdata/...
|
||||||
|
...
|
||||||
|
...data02... -> ziptestdata/...
|
||||||
|
...
|
||||||
|
"""
|
||||||
|
suffixes = '01', '02'
|
||||||
|
tuple(map(generate, suffixes))
|
||||||
|
|
||||||
|
|
||||||
|
def generate(suffix):
|
||||||
|
root = pathlib.Path(__file__).parent.relative_to(os.getcwd())
|
||||||
|
zfpath = root / f'zipdata{suffix}/ziptestdata.zip'
|
||||||
|
with zipfile.ZipFile(zfpath, 'w') as zf:
|
||||||
|
for src, rel in walk(root / f'data{suffix}'):
|
||||||
|
dst = 'ziptestdata' / pathlib.PurePosixPath(rel.as_posix())
|
||||||
|
print(src, '->', dst)
|
||||||
|
zf.write(src, dst)
|
||||||
|
|
||||||
|
|
||||||
|
def walk(datapath):
|
||||||
|
for dirpath, dirnames, filenames in os.walk(datapath):
|
||||||
|
with contextlib.suppress(KeyError):
|
||||||
|
dirnames.remove('__pycache__')
|
||||||
|
for filename in filenames:
|
||||||
|
res = pathlib.Path(dirpath) / filename
|
||||||
|
rel = res.relative_to(datapath)
|
||||||
|
yield res, rel
|
||||||
|
|
||||||
|
|
||||||
|
__name__ == '__main__' and main()
|
190
lib/importlib_resources/tests/util.py
Normal file
190
lib/importlib_resources/tests/util.py
Normal file
|
@ -0,0 +1,190 @@
|
||||||
|
import abc
|
||||||
|
import importlib
|
||||||
|
import io
|
||||||
|
import sys
|
||||||
|
import types
|
||||||
|
from pathlib import Path, PurePath
|
||||||
|
|
||||||
|
from . import data01
|
||||||
|
from . import zipdata01
|
||||||
|
from ..abc import ResourceReader
|
||||||
|
from ._compat import import_helper
|
||||||
|
|
||||||
|
|
||||||
|
from importlib.machinery import ModuleSpec
|
||||||
|
|
||||||
|
|
||||||
|
class Reader(ResourceReader):
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
vars(self).update(kwargs)
|
||||||
|
|
||||||
|
def get_resource_reader(self, package):
|
||||||
|
return self
|
||||||
|
|
||||||
|
def open_resource(self, path):
|
||||||
|
self._path = path
|
||||||
|
if isinstance(self.file, Exception):
|
||||||
|
raise self.file
|
||||||
|
return self.file
|
||||||
|
|
||||||
|
def resource_path(self, path_):
|
||||||
|
self._path = path_
|
||||||
|
if isinstance(self.path, Exception):
|
||||||
|
raise self.path
|
||||||
|
return self.path
|
||||||
|
|
||||||
|
def is_resource(self, path_):
|
||||||
|
self._path = path_
|
||||||
|
if isinstance(self.path, Exception):
|
||||||
|
raise self.path
|
||||||
|
|
||||||
|
def part(entry):
|
||||||
|
return entry.split('/')
|
||||||
|
|
||||||
|
return any(
|
||||||
|
len(parts) == 1 and parts[0] == path_ for parts in map(part, self._contents)
|
||||||
|
)
|
||||||
|
|
||||||
|
def contents(self):
|
||||||
|
if isinstance(self.path, Exception):
|
||||||
|
raise self.path
|
||||||
|
yield from self._contents
|
||||||
|
|
||||||
|
|
||||||
|
def create_package_from_loader(loader, is_package=True):
|
||||||
|
name = 'testingpackage'
|
||||||
|
module = types.ModuleType(name)
|
||||||
|
spec = ModuleSpec(name, loader, origin='does-not-exist', is_package=is_package)
|
||||||
|
module.__spec__ = spec
|
||||||
|
module.__loader__ = loader
|
||||||
|
return module
|
||||||
|
|
||||||
|
|
||||||
|
def create_package(file=None, path=None, is_package=True, contents=()):
|
||||||
|
return create_package_from_loader(
|
||||||
|
Reader(file=file, path=path, _contents=contents),
|
||||||
|
is_package,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class CommonTests(metaclass=abc.ABCMeta):
|
||||||
|
"""
|
||||||
|
Tests shared by test_open, test_path, and test_read.
|
||||||
|
"""
|
||||||
|
|
||||||
|
@abc.abstractmethod
|
||||||
|
def execute(self, package, path):
|
||||||
|
"""
|
||||||
|
Call the pertinent legacy API function (e.g. open_text, path)
|
||||||
|
on package and path.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def test_package_name(self):
|
||||||
|
# Passing in the package name should succeed.
|
||||||
|
self.execute(data01.__name__, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_package_object(self):
|
||||||
|
# Passing in the package itself should succeed.
|
||||||
|
self.execute(data01, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_string_path(self):
|
||||||
|
# Passing in a string for the path should succeed.
|
||||||
|
path = 'utf-8.file'
|
||||||
|
self.execute(data01, path)
|
||||||
|
|
||||||
|
def test_pathlib_path(self):
|
||||||
|
# Passing in a pathlib.PurePath object for the path should succeed.
|
||||||
|
path = PurePath('utf-8.file')
|
||||||
|
self.execute(data01, path)
|
||||||
|
|
||||||
|
def test_absolute_path(self):
|
||||||
|
# An absolute path is a ValueError.
|
||||||
|
path = Path(__file__)
|
||||||
|
full_path = path.parent / 'utf-8.file'
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.execute(data01, full_path)
|
||||||
|
|
||||||
|
def test_relative_path(self):
|
||||||
|
# A reative path is a ValueError.
|
||||||
|
with self.assertRaises(ValueError):
|
||||||
|
self.execute(data01, '../data01/utf-8.file')
|
||||||
|
|
||||||
|
def test_importing_module_as_side_effect(self):
|
||||||
|
# The anchor package can already be imported.
|
||||||
|
del sys.modules[data01.__name__]
|
||||||
|
self.execute(data01.__name__, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_non_package_by_name(self):
|
||||||
|
# The anchor package cannot be a module.
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
self.execute(__name__, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_non_package_by_package(self):
|
||||||
|
# The anchor package cannot be a module.
|
||||||
|
with self.assertRaises(TypeError):
|
||||||
|
module = sys.modules['importlib_resources.tests.util']
|
||||||
|
self.execute(module, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_missing_path(self):
|
||||||
|
# Attempting to open or read or request the path for a
|
||||||
|
# non-existent path should succeed if open_resource
|
||||||
|
# can return a viable data stream.
|
||||||
|
bytes_data = io.BytesIO(b'Hello, world!')
|
||||||
|
package = create_package(file=bytes_data, path=FileNotFoundError())
|
||||||
|
self.execute(package, 'utf-8.file')
|
||||||
|
self.assertEqual(package.__loader__._path, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_extant_path(self):
|
||||||
|
# Attempting to open or read or request the path when the
|
||||||
|
# path does exist should still succeed. Does not assert
|
||||||
|
# anything about the result.
|
||||||
|
bytes_data = io.BytesIO(b'Hello, world!')
|
||||||
|
# any path that exists
|
||||||
|
path = __file__
|
||||||
|
package = create_package(file=bytes_data, path=path)
|
||||||
|
self.execute(package, 'utf-8.file')
|
||||||
|
self.assertEqual(package.__loader__._path, 'utf-8.file')
|
||||||
|
|
||||||
|
def test_useless_loader(self):
|
||||||
|
package = create_package(file=FileNotFoundError(), path=FileNotFoundError())
|
||||||
|
with self.assertRaises(FileNotFoundError):
|
||||||
|
self.execute(package, 'utf-8.file')
|
||||||
|
|
||||||
|
|
||||||
|
class ZipSetupBase:
|
||||||
|
ZIP_MODULE = None
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def setUpClass(cls):
|
||||||
|
data_path = Path(cls.ZIP_MODULE.__file__)
|
||||||
|
data_dir = data_path.parent
|
||||||
|
cls._zip_path = str(data_dir / 'ziptestdata.zip')
|
||||||
|
sys.path.append(cls._zip_path)
|
||||||
|
cls.data = importlib.import_module('ziptestdata')
|
||||||
|
|
||||||
|
@classmethod
|
||||||
|
def tearDownClass(cls):
|
||||||
|
try:
|
||||||
|
sys.path.remove(cls._zip_path)
|
||||||
|
except ValueError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
del sys.path_importer_cache[cls._zip_path]
|
||||||
|
del sys.modules[cls.data.__name__]
|
||||||
|
except KeyError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
try:
|
||||||
|
del cls.data
|
||||||
|
del cls._zip_path
|
||||||
|
except AttributeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
modules = import_helper.modules_setup()
|
||||||
|
self.addCleanup(import_helper.modules_cleanup, *modules)
|
||||||
|
|
||||||
|
|
||||||
|
class ZipSetup(ZipSetupBase):
|
||||||
|
ZIP_MODULE = zipdata01 # type: ignore
|
0
lib/importlib_resources/tests/zipdata01/__init__.py
Normal file
0
lib/importlib_resources/tests/zipdata01/__init__.py
Normal file
BIN
lib/importlib_resources/tests/zipdata01/ziptestdata.zip
Normal file
BIN
lib/importlib_resources/tests/zipdata01/ziptestdata.zip
Normal file
Binary file not shown.
0
lib/importlib_resources/tests/zipdata02/__init__.py
Normal file
0
lib/importlib_resources/tests/zipdata02/__init__.py
Normal file
BIN
lib/importlib_resources/tests/zipdata02/ziptestdata.zip
Normal file
BIN
lib/importlib_resources/tests/zipdata02/ziptestdata.zip
Normal file
Binary file not shown.
Loading…
Add table
Add a link
Reference in a new issue