Remove Python 2 handling code (#2098)

* Remove Python 2 update modal

* Remove Python 2 handling code

* Remove backports dependencies

* Remove uses of future and __future__

* Fix import

* Remove requirements

* Update lib folder

* Clean up imports and blank lines

---------

Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>
This commit is contained in:
Tom Niget 2024-05-10 07:18:08 +02:00 committed by GitHub
commit de3393d62b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
97 changed files with 7443 additions and 2917 deletions

48
lib/typeguard/__init__.py Normal file
View file

@ -0,0 +1,48 @@
import os
from typing import Any
from ._checkers import TypeCheckerCallable as TypeCheckerCallable
from ._checkers import TypeCheckLookupCallback as TypeCheckLookupCallback
from ._checkers import check_type_internal as check_type_internal
from ._checkers import checker_lookup_functions as checker_lookup_functions
from ._checkers import load_plugins as load_plugins
from ._config import CollectionCheckStrategy as CollectionCheckStrategy
from ._config import ForwardRefPolicy as ForwardRefPolicy
from ._config import TypeCheckConfiguration as TypeCheckConfiguration
from ._decorators import typechecked as typechecked
from ._decorators import typeguard_ignore as typeguard_ignore
from ._exceptions import InstrumentationWarning as InstrumentationWarning
from ._exceptions import TypeCheckError as TypeCheckError
from ._exceptions import TypeCheckWarning as TypeCheckWarning
from ._exceptions import TypeHintWarning as TypeHintWarning
from ._functions import TypeCheckFailCallback as TypeCheckFailCallback
from ._functions import check_type as check_type
from ._functions import warn_on_error as warn_on_error
from ._importhook import ImportHookManager as ImportHookManager
from ._importhook import TypeguardFinder as TypeguardFinder
from ._importhook import install_import_hook as install_import_hook
from ._memo import TypeCheckMemo as TypeCheckMemo
from ._suppression import suppress_type_checks as suppress_type_checks
from ._utils import Unset as Unset
# Re-export imports so they look like they live directly in this package
for value in list(locals().values()):
if getattr(value, "__module__", "").startswith(f"{__name__}."):
value.__module__ = __name__
config: TypeCheckConfiguration
def __getattr__(name: str) -> Any:
if name == "config":
from ._config import global_config
return global_config
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
# Automatically load checker lookup functions unless explicitly disabled
if "TYPEGUARD_DISABLE_PLUGIN_AUTOLOAD" not in os.environ:
load_plugins()

910
lib/typeguard/_checkers.py Normal file
View file

@ -0,0 +1,910 @@
from __future__ import annotations
import collections.abc
import inspect
import sys
import types
import typing
import warnings
from enum import Enum
from inspect import Parameter, isclass, isfunction
from io import BufferedIOBase, IOBase, RawIOBase, TextIOBase
from textwrap import indent
from typing import (
IO,
AbstractSet,
Any,
BinaryIO,
Callable,
Dict,
ForwardRef,
List,
Mapping,
MutableMapping,
NewType,
Optional,
Sequence,
Set,
TextIO,
Tuple,
Type,
TypeVar,
Union,
)
from unittest.mock import Mock
try:
import typing_extensions
except ImportError:
typing_extensions = None # type: ignore[assignment]
from ._config import ForwardRefPolicy
from ._exceptions import TypeCheckError, TypeHintWarning
from ._memo import TypeCheckMemo
from ._utils import evaluate_forwardref, get_stacklevel, get_type_name, qualified_name
if sys.version_info >= (3, 13):
from typing import is_typeddict
else:
from typing_extensions import is_typeddict
if sys.version_info >= (3, 11):
from typing import (
Annotated,
NotRequired,
TypeAlias,
get_args,
get_origin,
)
SubclassableAny = Any
else:
from typing_extensions import (
Annotated,
NotRequired,
TypeAlias,
get_args,
get_origin,
)
from typing_extensions import Any as SubclassableAny
if sys.version_info >= (3, 10):
from importlib.metadata import entry_points
from typing import ParamSpec
else:
from importlib_metadata import entry_points
from typing_extensions import ParamSpec
TypeCheckerCallable: TypeAlias = Callable[
[Any, Any, Tuple[Any, ...], TypeCheckMemo], Any
]
TypeCheckLookupCallback: TypeAlias = Callable[
[Any, Tuple[Any, ...], Tuple[Any, ...]], Optional[TypeCheckerCallable]
]
checker_lookup_functions: list[TypeCheckLookupCallback] = []
generic_alias_types: tuple[type, ...] = (type(List), type(List[Any]))
if sys.version_info >= (3, 9):
generic_alias_types += (types.GenericAlias,)
# Sentinel
_missing = object()
# Lifted from mypy.sharedparse
BINARY_MAGIC_METHODS = {
"__add__",
"__and__",
"__cmp__",
"__divmod__",
"__div__",
"__eq__",
"__floordiv__",
"__ge__",
"__gt__",
"__iadd__",
"__iand__",
"__idiv__",
"__ifloordiv__",
"__ilshift__",
"__imatmul__",
"__imod__",
"__imul__",
"__ior__",
"__ipow__",
"__irshift__",
"__isub__",
"__itruediv__",
"__ixor__",
"__le__",
"__lshift__",
"__lt__",
"__matmul__",
"__mod__",
"__mul__",
"__ne__",
"__or__",
"__pow__",
"__radd__",
"__rand__",
"__rdiv__",
"__rfloordiv__",
"__rlshift__",
"__rmatmul__",
"__rmod__",
"__rmul__",
"__ror__",
"__rpow__",
"__rrshift__",
"__rshift__",
"__rsub__",
"__rtruediv__",
"__rxor__",
"__sub__",
"__truediv__",
"__xor__",
}
def check_callable(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not callable(value):
raise TypeCheckError("is not callable")
if args:
try:
signature = inspect.signature(value)
except (TypeError, ValueError):
return
argument_types = args[0]
if isinstance(argument_types, list) and not any(
type(item) is ParamSpec for item in argument_types
):
# The callable must not have keyword-only arguments without defaults
unfulfilled_kwonlyargs = [
param.name
for param in signature.parameters.values()
if param.kind == Parameter.KEYWORD_ONLY
and param.default == Parameter.empty
]
if unfulfilled_kwonlyargs:
raise TypeCheckError(
f"has mandatory keyword-only arguments in its declaration: "
f'{", ".join(unfulfilled_kwonlyargs)}'
)
num_positional_args = num_mandatory_pos_args = 0
has_varargs = False
for param in signature.parameters.values():
if param.kind in (
Parameter.POSITIONAL_ONLY,
Parameter.POSITIONAL_OR_KEYWORD,
):
num_positional_args += 1
if param.default is Parameter.empty:
num_mandatory_pos_args += 1
elif param.kind == Parameter.VAR_POSITIONAL:
has_varargs = True
if num_mandatory_pos_args > len(argument_types):
raise TypeCheckError(
f"has too many mandatory positional arguments in its declaration; "
f"expected {len(argument_types)} but {num_mandatory_pos_args} "
f"mandatory positional argument(s) declared"
)
elif not has_varargs and num_positional_args < len(argument_types):
raise TypeCheckError(
f"has too few arguments in its declaration; expected "
f"{len(argument_types)} but {num_positional_args} argument(s) "
f"declared"
)
def check_mapping(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if origin_type is Dict or origin_type is dict:
if not isinstance(value, dict):
raise TypeCheckError("is not a dict")
if origin_type is MutableMapping or origin_type is collections.abc.MutableMapping:
if not isinstance(value, collections.abc.MutableMapping):
raise TypeCheckError("is not a mutable mapping")
elif not isinstance(value, collections.abc.Mapping):
raise TypeCheckError("is not a mapping")
if args:
key_type, value_type = args
if key_type is not Any or value_type is not Any:
samples = memo.config.collection_check_strategy.iterate_samples(
value.items()
)
for k, v in samples:
try:
check_type_internal(k, key_type, memo)
except TypeCheckError as exc:
exc.append_path_element(f"key {k!r}")
raise
try:
check_type_internal(v, value_type, memo)
except TypeCheckError as exc:
exc.append_path_element(f"value of key {k!r}")
raise
def check_typed_dict(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, dict):
raise TypeCheckError("is not a dict")
declared_keys = frozenset(origin_type.__annotations__)
if hasattr(origin_type, "__required_keys__"):
required_keys = set(origin_type.__required_keys__)
else: # py3.8 and lower
required_keys = set(declared_keys) if origin_type.__total__ else set()
existing_keys = set(value)
extra_keys = existing_keys - declared_keys
if extra_keys:
keys_formatted = ", ".join(f'"{key}"' for key in sorted(extra_keys, key=repr))
raise TypeCheckError(f"has unexpected extra key(s): {keys_formatted}")
# Detect NotRequired fields which are hidden by get_type_hints()
type_hints: dict[str, type] = {}
for key, annotation in origin_type.__annotations__.items():
if isinstance(annotation, ForwardRef):
annotation = evaluate_forwardref(annotation, memo)
if get_origin(annotation) is NotRequired:
required_keys.discard(key)
annotation = get_args(annotation)[0]
type_hints[key] = annotation
missing_keys = required_keys - existing_keys
if missing_keys:
keys_formatted = ", ".join(f'"{key}"' for key in sorted(missing_keys, key=repr))
raise TypeCheckError(f"is missing required key(s): {keys_formatted}")
for key, argtype in type_hints.items():
argvalue = value.get(key, _missing)
if argvalue is not _missing:
try:
check_type_internal(argvalue, argtype, memo)
except TypeCheckError as exc:
exc.append_path_element(f"value of key {key!r}")
raise
def check_list(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, list):
raise TypeCheckError("is not a list")
if args and args != (Any,):
samples = memo.config.collection_check_strategy.iterate_samples(value)
for i, v in enumerate(samples):
try:
check_type_internal(v, args[0], memo)
except TypeCheckError as exc:
exc.append_path_element(f"item {i}")
raise
def check_sequence(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, collections.abc.Sequence):
raise TypeCheckError("is not a sequence")
if args and args != (Any,):
samples = memo.config.collection_check_strategy.iterate_samples(value)
for i, v in enumerate(samples):
try:
check_type_internal(v, args[0], memo)
except TypeCheckError as exc:
exc.append_path_element(f"item {i}")
raise
def check_set(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if origin_type is frozenset:
if not isinstance(value, frozenset):
raise TypeCheckError("is not a frozenset")
elif not isinstance(value, AbstractSet):
raise TypeCheckError("is not a set")
if args and args != (Any,):
samples = memo.config.collection_check_strategy.iterate_samples(value)
for v in samples:
try:
check_type_internal(v, args[0], memo)
except TypeCheckError as exc:
exc.append_path_element(f"[{v}]")
raise
def check_tuple(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
# Specialized check for NamedTuples
if field_types := getattr(origin_type, "__annotations__", None):
if not isinstance(value, origin_type):
raise TypeCheckError(
f"is not a named tuple of type {qualified_name(origin_type)}"
)
for name, field_type in field_types.items():
try:
check_type_internal(getattr(value, name), field_type, memo)
except TypeCheckError as exc:
exc.append_path_element(f"attribute {name!r}")
raise
return
elif not isinstance(value, tuple):
raise TypeCheckError("is not a tuple")
if args:
use_ellipsis = args[-1] is Ellipsis
tuple_params = args[: -1 if use_ellipsis else None]
else:
# Unparametrized Tuple or plain tuple
return
if use_ellipsis:
element_type = tuple_params[0]
samples = memo.config.collection_check_strategy.iterate_samples(value)
for i, element in enumerate(samples):
try:
check_type_internal(element, element_type, memo)
except TypeCheckError as exc:
exc.append_path_element(f"item {i}")
raise
elif tuple_params == ((),):
if value != ():
raise TypeCheckError("is not an empty tuple")
else:
if len(value) != len(tuple_params):
raise TypeCheckError(
f"has wrong number of elements (expected {len(tuple_params)}, got "
f"{len(value)} instead)"
)
for i, (element, element_type) in enumerate(zip(value, tuple_params)):
try:
check_type_internal(element, element_type, memo)
except TypeCheckError as exc:
exc.append_path_element(f"item {i}")
raise
def check_union(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
errors: dict[str, TypeCheckError] = {}
try:
for type_ in args:
try:
check_type_internal(value, type_, memo)
return
except TypeCheckError as exc:
errors[get_type_name(type_)] = exc
formatted_errors = indent(
"\n".join(f"{key}: {error}" for key, error in errors.items()), " "
)
finally:
del errors # avoid creating ref cycle
raise TypeCheckError(f"did not match any element in the union:\n{formatted_errors}")
def check_uniontype(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
errors: dict[str, TypeCheckError] = {}
for type_ in args:
try:
check_type_internal(value, type_, memo)
return
except TypeCheckError as exc:
errors[get_type_name(type_)] = exc
formatted_errors = indent(
"\n".join(f"{key}: {error}" for key, error in errors.items()), " "
)
raise TypeCheckError(f"did not match any element in the union:\n{formatted_errors}")
def check_class(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isclass(value) and not isinstance(value, generic_alias_types):
raise TypeCheckError("is not a class")
if not args:
return
if isinstance(args[0], ForwardRef):
expected_class = evaluate_forwardref(args[0], memo)
else:
expected_class = args[0]
if expected_class is Any:
return
elif getattr(expected_class, "_is_protocol", False):
check_protocol(value, expected_class, (), memo)
elif isinstance(expected_class, TypeVar):
check_typevar(value, expected_class, (), memo, subclass_check=True)
elif get_origin(expected_class) is Union:
errors: dict[str, TypeCheckError] = {}
for arg in get_args(expected_class):
if arg is Any:
return
try:
check_class(value, type, (arg,), memo)
return
except TypeCheckError as exc:
errors[get_type_name(arg)] = exc
else:
formatted_errors = indent(
"\n".join(f"{key}: {error}" for key, error in errors.items()), " "
)
raise TypeCheckError(
f"did not match any element in the union:\n{formatted_errors}"
)
elif not issubclass(value, expected_class): # type: ignore[arg-type]
raise TypeCheckError(f"is not a subclass of {qualified_name(expected_class)}")
def check_newtype(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
check_type_internal(value, origin_type.__supertype__, memo)
def check_instance(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, origin_type):
raise TypeCheckError(f"is not an instance of {qualified_name(origin_type)}")
def check_typevar(
value: Any,
origin_type: TypeVar,
args: tuple[Any, ...],
memo: TypeCheckMemo,
*,
subclass_check: bool = False,
) -> None:
if origin_type.__bound__ is not None:
annotation = (
Type[origin_type.__bound__] if subclass_check else origin_type.__bound__
)
check_type_internal(value, annotation, memo)
elif origin_type.__constraints__:
for constraint in origin_type.__constraints__:
annotation = Type[constraint] if subclass_check else constraint
try:
check_type_internal(value, annotation, memo)
except TypeCheckError:
pass
else:
break
else:
formatted_constraints = ", ".join(
get_type_name(constraint) for constraint in origin_type.__constraints__
)
raise TypeCheckError(
f"does not match any of the constraints " f"({formatted_constraints})"
)
if typing_extensions is None:
def _is_literal_type(typ: object) -> bool:
return typ is typing.Literal
else:
def _is_literal_type(typ: object) -> bool:
return typ is typing.Literal or typ is typing_extensions.Literal
def check_literal(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
def get_literal_args(literal_args: tuple[Any, ...]) -> tuple[Any, ...]:
retval: list[Any] = []
for arg in literal_args:
if _is_literal_type(get_origin(arg)):
retval.extend(get_literal_args(arg.__args__))
elif arg is None or isinstance(arg, (int, str, bytes, bool, Enum)):
retval.append(arg)
else:
raise TypeError(
f"Illegal literal value: {arg}"
) # TypeError here is deliberate
return tuple(retval)
final_args = tuple(get_literal_args(args))
try:
index = final_args.index(value)
except ValueError:
pass
else:
if type(final_args[index]) is type(value):
return
formatted_args = ", ".join(repr(arg) for arg in final_args)
raise TypeCheckError(f"is not any of ({formatted_args})") from None
def check_literal_string(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
check_type_internal(value, str, memo)
def check_typeguard(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
check_type_internal(value, bool, memo)
def check_none(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if value is not None:
raise TypeCheckError("is not None")
def check_number(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if origin_type is complex and not isinstance(value, (complex, float, int)):
raise TypeCheckError("is neither complex, float or int")
elif origin_type is float and not isinstance(value, (float, int)):
raise TypeCheckError("is neither float or int")
def check_io(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if origin_type is TextIO or (origin_type is IO and args == (str,)):
if not isinstance(value, TextIOBase):
raise TypeCheckError("is not a text based I/O object")
elif origin_type is BinaryIO or (origin_type is IO and args == (bytes,)):
if not isinstance(value, (RawIOBase, BufferedIOBase)):
raise TypeCheckError("is not a binary I/O object")
elif not isinstance(value, IOBase):
raise TypeCheckError("is not an I/O object")
def check_protocol(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
# TODO: implement proper compatibility checking and support non-runtime protocols
if getattr(origin_type, "_is_runtime_protocol", False):
if not isinstance(value, origin_type):
raise TypeCheckError(
f"is not compatible with the {origin_type.__qualname__} protocol"
)
else:
warnings.warn(
f"Typeguard cannot check the {origin_type.__qualname__} protocol because "
f"it is a non-runtime protocol. If you would like to type check this "
f"protocol, please use @typing.runtime_checkable",
stacklevel=get_stacklevel(),
)
def check_byteslike(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, (bytearray, bytes, memoryview)):
raise TypeCheckError("is not bytes-like")
def check_self(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if memo.self_type is None:
raise TypeCheckError("cannot be checked against Self outside of a method call")
if isclass(value):
if not issubclass(value, memo.self_type):
raise TypeCheckError(
f"is not an instance of the self type "
f"({qualified_name(memo.self_type)})"
)
elif not isinstance(value, memo.self_type):
raise TypeCheckError(
f"is not an instance of the self type ({qualified_name(memo.self_type)})"
)
def check_paramspec(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
pass # No-op for now
def check_instanceof(
value: Any,
origin_type: Any,
args: tuple[Any, ...],
memo: TypeCheckMemo,
) -> None:
if not isinstance(value, origin_type):
raise TypeCheckError(f"is not an instance of {qualified_name(origin_type)}")
def check_type_internal(
value: Any,
annotation: Any,
memo: TypeCheckMemo,
) -> None:
"""
Check that the given object is compatible with the given type annotation.
This function should only be used by type checker callables. Applications should use
:func:`~.check_type` instead.
:param value: the value to check
:param annotation: the type annotation to check against
:param memo: a memo object containing configuration and information necessary for
looking up forward references
"""
if isinstance(annotation, ForwardRef):
try:
annotation = evaluate_forwardref(annotation, memo)
except NameError:
if memo.config.forward_ref_policy is ForwardRefPolicy.ERROR:
raise
elif memo.config.forward_ref_policy is ForwardRefPolicy.WARN:
warnings.warn(
f"Cannot resolve forward reference {annotation.__forward_arg__!r}",
TypeHintWarning,
stacklevel=get_stacklevel(),
)
return
if annotation is Any or annotation is SubclassableAny or isinstance(value, Mock):
return
# Skip type checks if value is an instance of a class that inherits from Any
if not isclass(value) and SubclassableAny in type(value).__bases__:
return
extras: tuple[Any, ...]
origin_type = get_origin(annotation)
if origin_type is Annotated:
annotation, *extras_ = get_args(annotation)
extras = tuple(extras_)
origin_type = get_origin(annotation)
else:
extras = ()
if origin_type is not None:
args = get_args(annotation)
# Compatibility hack to distinguish between unparametrized and empty tuple
# (tuple[()]), necessary due to https://github.com/python/cpython/issues/91137
if origin_type in (tuple, Tuple) and annotation is not Tuple and not args:
args = ((),)
else:
origin_type = annotation
args = ()
for lookup_func in checker_lookup_functions:
checker = lookup_func(origin_type, args, extras)
if checker:
checker(value, origin_type, args, memo)
return
if isclass(origin_type):
if not isinstance(value, origin_type):
raise TypeCheckError(f"is not an instance of {qualified_name(origin_type)}")
elif type(origin_type) is str: # noqa: E721
warnings.warn(
f"Skipping type check against {origin_type!r}; this looks like a "
f"string-form forward reference imported from another module",
TypeHintWarning,
stacklevel=get_stacklevel(),
)
# Equality checks are applied to these
origin_type_checkers = {
bytes: check_byteslike,
AbstractSet: check_set,
BinaryIO: check_io,
Callable: check_callable,
collections.abc.Callable: check_callable,
complex: check_number,
dict: check_mapping,
Dict: check_mapping,
float: check_number,
frozenset: check_set,
IO: check_io,
list: check_list,
List: check_list,
typing.Literal: check_literal,
Mapping: check_mapping,
MutableMapping: check_mapping,
None: check_none,
collections.abc.Mapping: check_mapping,
collections.abc.MutableMapping: check_mapping,
Sequence: check_sequence,
collections.abc.Sequence: check_sequence,
collections.abc.Set: check_set,
set: check_set,
Set: check_set,
TextIO: check_io,
tuple: check_tuple,
Tuple: check_tuple,
type: check_class,
Type: check_class,
Union: check_union,
}
if sys.version_info >= (3, 10):
origin_type_checkers[types.UnionType] = check_uniontype
origin_type_checkers[typing.TypeGuard] = check_typeguard
if sys.version_info >= (3, 11):
origin_type_checkers.update(
{typing.LiteralString: check_literal_string, typing.Self: check_self}
)
if typing_extensions is not None:
# On some Python versions, these may simply be re-exports from typing,
# but exactly which Python versions is subject to change,
# so it's best to err on the safe side
# and update the dictionary on all Python versions
# if typing_extensions is installed
origin_type_checkers[typing_extensions.Literal] = check_literal
origin_type_checkers[typing_extensions.LiteralString] = check_literal_string
origin_type_checkers[typing_extensions.Self] = check_self
origin_type_checkers[typing_extensions.TypeGuard] = check_typeguard
def builtin_checker_lookup(
origin_type: Any, args: tuple[Any, ...], extras: tuple[Any, ...]
) -> TypeCheckerCallable | None:
checker = origin_type_checkers.get(origin_type)
if checker is not None:
return checker
elif is_typeddict(origin_type):
return check_typed_dict
elif isclass(origin_type) and issubclass(
origin_type, Tuple # type: ignore[arg-type]
):
# NamedTuple
return check_tuple
elif getattr(origin_type, "_is_protocol", False):
return check_protocol
elif isinstance(origin_type, ParamSpec):
return check_paramspec
elif isinstance(origin_type, TypeVar):
return check_typevar
elif origin_type.__class__ is NewType:
# typing.NewType on Python 3.10+
return check_newtype
elif (
isfunction(origin_type)
and getattr(origin_type, "__module__", None) == "typing"
and getattr(origin_type, "__qualname__", "").startswith("NewType.")
and hasattr(origin_type, "__supertype__")
):
# typing.NewType on Python 3.9 and below
return check_newtype
return None
checker_lookup_functions.append(builtin_checker_lookup)
def load_plugins() -> None:
"""
Load all type checker lookup functions from entry points.
All entry points from the ``typeguard.checker_lookup`` group are loaded, and the
returned lookup functions are added to :data:`typeguard.checker_lookup_functions`.
.. note:: This function is called implicitly on import, unless the
``TYPEGUARD_DISABLE_PLUGIN_AUTOLOAD`` environment variable is present.
"""
for ep in entry_points(group="typeguard.checker_lookup"):
try:
plugin = ep.load()
except Exception as exc:
warnings.warn(
f"Failed to load plugin {ep.name!r}: " f"{qualified_name(exc)}: {exc}",
stacklevel=2,
)
continue
if not callable(plugin):
warnings.warn(
f"Plugin {ep} returned a non-callable object: {plugin!r}", stacklevel=2
)
continue
checker_lookup_functions.insert(0, plugin)

108
lib/typeguard/_config.py Normal file
View file

@ -0,0 +1,108 @@
from __future__ import annotations
from collections.abc import Iterable
from dataclasses import dataclass
from enum import Enum, auto
from typing import TYPE_CHECKING, TypeVar
if TYPE_CHECKING:
from ._functions import TypeCheckFailCallback
T = TypeVar("T")
class ForwardRefPolicy(Enum):
"""
Defines how unresolved forward references are handled.
Members:
* ``ERROR``: propagate the :exc:`NameError` when the forward reference lookup fails
* ``WARN``: emit a :class:`~.TypeHintWarning` if the forward reference lookup fails
* ``IGNORE``: silently skip checks for unresolveable forward references
"""
ERROR = auto()
WARN = auto()
IGNORE = auto()
class CollectionCheckStrategy(Enum):
"""
Specifies how thoroughly the contents of collections are type checked.
This has an effect on the following built-in checkers:
* ``AbstractSet``
* ``Dict``
* ``List``
* ``Mapping``
* ``Set``
* ``Tuple[<type>, ...]`` (arbitrarily sized tuples)
Members:
* ``FIRST_ITEM``: check only the first item
* ``ALL_ITEMS``: check all items
"""
FIRST_ITEM = auto()
ALL_ITEMS = auto()
def iterate_samples(self, collection: Iterable[T]) -> Iterable[T]:
if self is CollectionCheckStrategy.FIRST_ITEM:
try:
return [next(iter(collection))]
except StopIteration:
return ()
else:
return collection
@dataclass
class TypeCheckConfiguration:
"""
You can change Typeguard's behavior with these settings.
.. attribute:: typecheck_fail_callback
:type: Callable[[TypeCheckError, TypeCheckMemo], Any]
Callable that is called when type checking fails.
Default: ``None`` (the :exc:`~.TypeCheckError` is raised directly)
.. attribute:: forward_ref_policy
:type: ForwardRefPolicy
Specifies what to do when a forward reference fails to resolve.
Default: ``WARN``
.. attribute:: collection_check_strategy
:type: CollectionCheckStrategy
Specifies how thoroughly the contents of collections (list, dict, etc.) are
type checked.
Default: ``FIRST_ITEM``
.. attribute:: debug_instrumentation
:type: bool
If set to ``True``, the code of modules or functions instrumented by typeguard
is printed to ``sys.stderr`` after the instrumentation is done
Requires Python 3.9 or newer.
Default: ``False``
"""
forward_ref_policy: ForwardRefPolicy = ForwardRefPolicy.WARN
typecheck_fail_callback: TypeCheckFailCallback | None = None
collection_check_strategy: CollectionCheckStrategy = (
CollectionCheckStrategy.FIRST_ITEM
)
debug_instrumentation: bool = False
global_config = TypeCheckConfiguration()

View file

@ -0,0 +1,235 @@
from __future__ import annotations
import ast
import inspect
import sys
from collections.abc import Sequence
from functools import partial
from inspect import isclass, isfunction
from types import CodeType, FrameType, FunctionType
from typing import TYPE_CHECKING, Any, Callable, ForwardRef, TypeVar, cast, overload
from warnings import warn
from ._config import CollectionCheckStrategy, ForwardRefPolicy, global_config
from ._exceptions import InstrumentationWarning
from ._functions import TypeCheckFailCallback
from ._transformer import TypeguardTransformer
from ._utils import Unset, function_name, get_stacklevel, is_method_of, unset
if TYPE_CHECKING:
from typeshed.stdlib.types import _Cell
_F = TypeVar("_F")
def typeguard_ignore(f: _F) -> _F:
"""This decorator is a noop during static type-checking."""
return f
else:
from typing import no_type_check as typeguard_ignore # noqa: F401
T_CallableOrType = TypeVar("T_CallableOrType", bound=Callable[..., Any])
def make_cell(value: object) -> _Cell:
return (lambda: value).__closure__[0] # type: ignore[index]
def find_target_function(
new_code: CodeType, target_path: Sequence[str], firstlineno: int
) -> CodeType | None:
target_name = target_path[0]
for const in new_code.co_consts:
if isinstance(const, CodeType):
if const.co_name == target_name:
if const.co_firstlineno == firstlineno:
return const
elif len(target_path) > 1:
target_code = find_target_function(
const, target_path[1:], firstlineno
)
if target_code:
return target_code
return None
def instrument(f: T_CallableOrType) -> FunctionType | str:
if not getattr(f, "__code__", None):
return "no code associated"
elif not getattr(f, "__module__", None):
return "__module__ attribute is not set"
elif f.__code__.co_filename == "<stdin>":
return "cannot instrument functions defined in a REPL"
elif hasattr(f, "__wrapped__"):
return (
"@typechecked only supports instrumenting functions wrapped with "
"@classmethod, @staticmethod or @property"
)
target_path = [item for item in f.__qualname__.split(".") if item != "<locals>"]
module_source = inspect.getsource(sys.modules[f.__module__])
module_ast = ast.parse(module_source)
instrumentor = TypeguardTransformer(target_path, f.__code__.co_firstlineno)
instrumentor.visit(module_ast)
if not instrumentor.target_node or instrumentor.target_lineno is None:
return "instrumentor did not find the target function"
module_code = compile(module_ast, f.__code__.co_filename, "exec", dont_inherit=True)
new_code = find_target_function(
module_code, target_path, instrumentor.target_lineno
)
if not new_code:
return "cannot find the target function in the AST"
if global_config.debug_instrumentation and sys.version_info >= (3, 9):
# Find the matching AST node, then unparse it to source and print to stdout
print(
f"Source code of {f.__qualname__}() after instrumentation:"
"\n----------------------------------------------",
file=sys.stderr,
)
print(ast.unparse(instrumentor.target_node), file=sys.stderr)
print(
"----------------------------------------------",
file=sys.stderr,
)
closure = f.__closure__
if new_code.co_freevars != f.__code__.co_freevars:
# Create a new closure and find values for the new free variables
frame = cast(FrameType, inspect.currentframe())
frame = cast(FrameType, frame.f_back)
frame_locals = cast(FrameType, frame.f_back).f_locals
cells: list[_Cell] = []
for key in new_code.co_freevars:
if key in instrumentor.names_used_in_annotations:
# Find the value and make a new cell from it
value = frame_locals.get(key) or ForwardRef(key)
cells.append(make_cell(value))
else:
# Reuse the cell from the existing closure
assert f.__closure__
cells.append(f.__closure__[f.__code__.co_freevars.index(key)])
closure = tuple(cells)
new_function = FunctionType(new_code, f.__globals__, f.__name__, closure=closure)
new_function.__module__ = f.__module__
new_function.__name__ = f.__name__
new_function.__qualname__ = f.__qualname__
new_function.__annotations__ = f.__annotations__
new_function.__doc__ = f.__doc__
new_function.__defaults__ = f.__defaults__
new_function.__kwdefaults__ = f.__kwdefaults__
return new_function
@overload
def typechecked(
*,
forward_ref_policy: ForwardRefPolicy | Unset = unset,
typecheck_fail_callback: TypeCheckFailCallback | Unset = unset,
collection_check_strategy: CollectionCheckStrategy | Unset = unset,
debug_instrumentation: bool | Unset = unset,
) -> Callable[[T_CallableOrType], T_CallableOrType]: ...
@overload
def typechecked(target: T_CallableOrType) -> T_CallableOrType: ...
def typechecked(
target: T_CallableOrType | None = None,
*,
forward_ref_policy: ForwardRefPolicy | Unset = unset,
typecheck_fail_callback: TypeCheckFailCallback | Unset = unset,
collection_check_strategy: CollectionCheckStrategy | Unset = unset,
debug_instrumentation: bool | Unset = unset,
) -> Any:
"""
Instrument the target function to perform run-time type checking.
This decorator recompiles the target function, injecting code to type check
arguments, return values, yield values (excluding ``yield from``) and assignments to
annotated local variables.
This can also be used as a class decorator. This will instrument all type annotated
methods, including :func:`@classmethod <classmethod>`,
:func:`@staticmethod <staticmethod>`, and :class:`@property <property>` decorated
methods in the class.
.. note:: When Python is run in optimized mode (``-O`` or ``-OO``, this decorator
is a no-op). This is a feature meant for selectively introducing type checking
into a code base where the checks aren't meant to be run in production.
:param target: the function or class to enable type checking for
:param forward_ref_policy: override for
:attr:`.TypeCheckConfiguration.forward_ref_policy`
:param typecheck_fail_callback: override for
:attr:`.TypeCheckConfiguration.typecheck_fail_callback`
:param collection_check_strategy: override for
:attr:`.TypeCheckConfiguration.collection_check_strategy`
:param debug_instrumentation: override for
:attr:`.TypeCheckConfiguration.debug_instrumentation`
"""
if target is None:
return partial(
typechecked,
forward_ref_policy=forward_ref_policy,
typecheck_fail_callback=typecheck_fail_callback,
collection_check_strategy=collection_check_strategy,
debug_instrumentation=debug_instrumentation,
)
if not __debug__:
return target
if isclass(target):
for key, attr in target.__dict__.items():
if is_method_of(attr, target):
retval = instrument(attr)
if isfunction(retval):
setattr(target, key, retval)
elif isinstance(attr, (classmethod, staticmethod)):
if is_method_of(attr.__func__, target):
retval = instrument(attr.__func__)
if isfunction(retval):
wrapper = attr.__class__(retval)
setattr(target, key, wrapper)
elif isinstance(attr, property):
kwargs: dict[str, Any] = dict(doc=attr.__doc__)
for name in ("fset", "fget", "fdel"):
property_func = kwargs[name] = getattr(attr, name)
if is_method_of(property_func, target):
retval = instrument(property_func)
if isfunction(retval):
kwargs[name] = retval
setattr(target, key, attr.__class__(**kwargs))
return target
# Find either the first Python wrapper or the actual function
wrapper_class: (
type[classmethod[Any, Any, Any]] | type[staticmethod[Any, Any]] | None
) = None
if isinstance(target, (classmethod, staticmethod)):
wrapper_class = target.__class__
target = target.__func__
retval = instrument(target)
if isinstance(retval, str):
warn(
f"{retval} -- not typechecking {function_name(target)}",
InstrumentationWarning,
stacklevel=get_stacklevel(),
)
return target
if wrapper_class is None:
return retval
else:
return wrapper_class(retval)

View file

@ -0,0 +1,42 @@
from collections import deque
from typing import Deque
class TypeHintWarning(UserWarning):
"""
A warning that is emitted when a type hint in string form could not be resolved to
an actual type.
"""
class TypeCheckWarning(UserWarning):
"""Emitted by typeguard's type checkers when a type mismatch is detected."""
def __init__(self, message: str):
super().__init__(message)
class InstrumentationWarning(UserWarning):
"""Emitted when there's a problem with instrumenting a function for type checks."""
def __init__(self, message: str):
super().__init__(message)
class TypeCheckError(Exception):
"""
Raised by typeguard's type checkers when a type mismatch is detected.
"""
def __init__(self, message: str):
super().__init__(message)
self._path: Deque[str] = deque()
def append_path_element(self, element: str) -> None:
self._path.append(element)
def __str__(self) -> str:
if self._path:
return " of ".join(self._path) + " " + str(self.args[0])
else:
return str(self.args[0])

308
lib/typeguard/_functions.py Normal file
View file

@ -0,0 +1,308 @@
from __future__ import annotations
import sys
import warnings
from typing import Any, Callable, NoReturn, TypeVar, Union, overload
from . import _suppression
from ._checkers import BINARY_MAGIC_METHODS, check_type_internal
from ._config import (
CollectionCheckStrategy,
ForwardRefPolicy,
TypeCheckConfiguration,
)
from ._exceptions import TypeCheckError, TypeCheckWarning
from ._memo import TypeCheckMemo
from ._utils import get_stacklevel, qualified_name
if sys.version_info >= (3, 11):
from typing import Literal, Never, TypeAlias
else:
from typing_extensions import Literal, Never, TypeAlias
T = TypeVar("T")
TypeCheckFailCallback: TypeAlias = Callable[[TypeCheckError, TypeCheckMemo], Any]
@overload
def check_type(
value: object,
expected_type: type[T],
*,
forward_ref_policy: ForwardRefPolicy = ...,
typecheck_fail_callback: TypeCheckFailCallback | None = ...,
collection_check_strategy: CollectionCheckStrategy = ...,
) -> T: ...
@overload
def check_type(
value: object,
expected_type: Any,
*,
forward_ref_policy: ForwardRefPolicy = ...,
typecheck_fail_callback: TypeCheckFailCallback | None = ...,
collection_check_strategy: CollectionCheckStrategy = ...,
) -> Any: ...
def check_type(
value: object,
expected_type: Any,
*,
forward_ref_policy: ForwardRefPolicy = TypeCheckConfiguration().forward_ref_policy,
typecheck_fail_callback: TypeCheckFailCallback | None = (
TypeCheckConfiguration().typecheck_fail_callback
),
collection_check_strategy: CollectionCheckStrategy = (
TypeCheckConfiguration().collection_check_strategy
),
) -> Any:
"""
Ensure that ``value`` matches ``expected_type``.
The types from the :mod:`typing` module do not support :func:`isinstance` or
:func:`issubclass` so a number of type specific checks are required. This function
knows which checker to call for which type.
This function wraps :func:`~.check_type_internal` in the following ways:
* Respects type checking suppression (:func:`~.suppress_type_checks`)
* Forms a :class:`~.TypeCheckMemo` from the current stack frame
* Calls the configured type check fail callback if the check fails
Note that this function is independent of the globally shared configuration in
:data:`typeguard.config`. This means that usage within libraries is safe from being
affected configuration changes made by other libraries or by the integrating
application. Instead, configuration options have the same default values as their
corresponding fields in :class:`TypeCheckConfiguration`.
:param value: value to be checked against ``expected_type``
:param expected_type: a class or generic type instance, or a tuple of such things
:param forward_ref_policy: see :attr:`TypeCheckConfiguration.forward_ref_policy`
:param typecheck_fail_callback:
see :attr`TypeCheckConfiguration.typecheck_fail_callback`
:param collection_check_strategy:
see :attr:`TypeCheckConfiguration.collection_check_strategy`
:return: ``value``, unmodified
:raises TypeCheckError: if there is a type mismatch
"""
if type(expected_type) is tuple:
expected_type = Union[expected_type]
config = TypeCheckConfiguration(
forward_ref_policy=forward_ref_policy,
typecheck_fail_callback=typecheck_fail_callback,
collection_check_strategy=collection_check_strategy,
)
if _suppression.type_checks_suppressed or expected_type is Any:
return value
frame = sys._getframe(1)
memo = TypeCheckMemo(frame.f_globals, frame.f_locals, config=config)
try:
check_type_internal(value, expected_type, memo)
except TypeCheckError as exc:
exc.append_path_element(qualified_name(value, add_class_prefix=True))
if config.typecheck_fail_callback:
config.typecheck_fail_callback(exc, memo)
else:
raise
return value
def check_argument_types(
func_name: str,
arguments: dict[str, tuple[Any, Any]],
memo: TypeCheckMemo,
) -> Literal[True]:
if _suppression.type_checks_suppressed:
return True
for argname, (value, annotation) in arguments.items():
if annotation is NoReturn or annotation is Never:
exc = TypeCheckError(
f"{func_name}() was declared never to be called but it was"
)
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise exc
try:
check_type_internal(value, annotation, memo)
except TypeCheckError as exc:
qualname = qualified_name(value, add_class_prefix=True)
exc.append_path_element(f'argument "{argname}" ({qualname})')
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return True
def check_return_type(
func_name: str,
retval: T,
annotation: Any,
memo: TypeCheckMemo,
) -> T:
if _suppression.type_checks_suppressed:
return retval
if annotation is NoReturn or annotation is Never:
exc = TypeCheckError(f"{func_name}() was declared never to return but it did")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise exc
try:
check_type_internal(retval, annotation, memo)
except TypeCheckError as exc:
# Allow NotImplemented if this is a binary magic method (__eq__() et al)
if retval is NotImplemented and annotation is bool:
# This does (and cannot) not check if it's actually a method
func_name = func_name.rsplit(".", 1)[-1]
if func_name in BINARY_MAGIC_METHODS:
return retval
qualname = qualified_name(retval, add_class_prefix=True)
exc.append_path_element(f"the return value ({qualname})")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return retval
def check_send_type(
func_name: str,
sendval: T,
annotation: Any,
memo: TypeCheckMemo,
) -> T:
if _suppression.type_checks_suppressed:
return sendval
if annotation is NoReturn or annotation is Never:
exc = TypeCheckError(
f"{func_name}() was declared never to be sent a value to but it was"
)
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise exc
try:
check_type_internal(sendval, annotation, memo)
except TypeCheckError as exc:
qualname = qualified_name(sendval, add_class_prefix=True)
exc.append_path_element(f"the value sent to generator ({qualname})")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return sendval
def check_yield_type(
func_name: str,
yieldval: T,
annotation: Any,
memo: TypeCheckMemo,
) -> T:
if _suppression.type_checks_suppressed:
return yieldval
if annotation is NoReturn or annotation is Never:
exc = TypeCheckError(f"{func_name}() was declared never to yield but it did")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise exc
try:
check_type_internal(yieldval, annotation, memo)
except TypeCheckError as exc:
qualname = qualified_name(yieldval, add_class_prefix=True)
exc.append_path_element(f"the yielded value ({qualname})")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return yieldval
def check_variable_assignment(
value: object, varname: str, annotation: Any, memo: TypeCheckMemo
) -> Any:
if _suppression.type_checks_suppressed:
return value
try:
check_type_internal(value, annotation, memo)
except TypeCheckError as exc:
qualname = qualified_name(value, add_class_prefix=True)
exc.append_path_element(f"value assigned to {varname} ({qualname})")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return value
def check_multi_variable_assignment(
value: Any, targets: list[dict[str, Any]], memo: TypeCheckMemo
) -> Any:
if max(len(target) for target in targets) == 1:
iterated_values = [value]
else:
iterated_values = list(value)
if not _suppression.type_checks_suppressed:
for expected_types in targets:
value_index = 0
for ann_index, (varname, expected_type) in enumerate(
expected_types.items()
):
if varname.startswith("*"):
varname = varname[1:]
keys_left = len(expected_types) - 1 - ann_index
next_value_index = len(iterated_values) - keys_left
obj: object = iterated_values[value_index:next_value_index]
value_index = next_value_index
else:
obj = iterated_values[value_index]
value_index += 1
try:
check_type_internal(obj, expected_type, memo)
except TypeCheckError as exc:
qualname = qualified_name(obj, add_class_prefix=True)
exc.append_path_element(f"value assigned to {varname} ({qualname})")
if memo.config.typecheck_fail_callback:
memo.config.typecheck_fail_callback(exc, memo)
else:
raise
return iterated_values[0] if len(iterated_values) == 1 else iterated_values
def warn_on_error(exc: TypeCheckError, memo: TypeCheckMemo) -> None:
"""
Emit a warning on a type mismatch.
This is intended to be used as an error handler in
:attr:`TypeCheckConfiguration.typecheck_fail_callback`.
"""
warnings.warn(TypeCheckWarning(str(exc)), stacklevel=get_stacklevel())

View file

@ -0,0 +1,213 @@
from __future__ import annotations
import ast
import sys
import types
from collections.abc import Callable, Iterable
from importlib.abc import MetaPathFinder
from importlib.machinery import ModuleSpec, SourceFileLoader
from importlib.util import cache_from_source, decode_source
from inspect import isclass
from os import PathLike
from types import CodeType, ModuleType, TracebackType
from typing import Sequence, TypeVar
from unittest.mock import patch
from ._config import global_config
from ._transformer import TypeguardTransformer
if sys.version_info >= (3, 12):
from collections.abc import Buffer
else:
from typing_extensions import Buffer
if sys.version_info >= (3, 11):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
if sys.version_info >= (3, 10):
from importlib.metadata import PackageNotFoundError, version
else:
from importlib_metadata import PackageNotFoundError, version
try:
OPTIMIZATION = "typeguard" + "".join(version("typeguard").split(".")[:3])
except PackageNotFoundError:
OPTIMIZATION = "typeguard"
P = ParamSpec("P")
T = TypeVar("T")
# The name of this function is magical
def _call_with_frames_removed(
f: Callable[P, T], *args: P.args, **kwargs: P.kwargs
) -> T:
return f(*args, **kwargs)
def optimized_cache_from_source(path: str, debug_override: bool | None = None) -> str:
return cache_from_source(path, debug_override, optimization=OPTIMIZATION)
class TypeguardLoader(SourceFileLoader):
@staticmethod
def source_to_code(
data: Buffer | str | ast.Module | ast.Expression | ast.Interactive,
path: Buffer | str | PathLike[str] = "<string>",
) -> CodeType:
if isinstance(data, (ast.Module, ast.Expression, ast.Interactive)):
tree = data
else:
if isinstance(data, str):
source = data
else:
source = decode_source(data)
tree = _call_with_frames_removed(
ast.parse,
source,
path,
"exec",
)
tree = TypeguardTransformer().visit(tree)
ast.fix_missing_locations(tree)
if global_config.debug_instrumentation and sys.version_info >= (3, 9):
print(
f"Source code of {path!r} after instrumentation:\n"
"----------------------------------------------",
file=sys.stderr,
)
print(ast.unparse(tree), file=sys.stderr)
print("----------------------------------------------", file=sys.stderr)
return _call_with_frames_removed(
compile, tree, path, "exec", 0, dont_inherit=True
)
def exec_module(self, module: ModuleType) -> None:
# Use a custom optimization marker the import lock should make this monkey
# patch safe
with patch(
"importlib._bootstrap_external.cache_from_source",
optimized_cache_from_source,
):
super().exec_module(module)
class TypeguardFinder(MetaPathFinder):
"""
Wraps another path finder and instruments the module with
:func:`@typechecked <typeguard.typechecked>` if :meth:`should_instrument` returns
``True``.
Should not be used directly, but rather via :func:`~.install_import_hook`.
.. versionadded:: 2.6
"""
def __init__(self, packages: list[str] | None, original_pathfinder: MetaPathFinder):
self.packages = packages
self._original_pathfinder = original_pathfinder
def find_spec(
self,
fullname: str,
path: Sequence[str] | None,
target: types.ModuleType | None = None,
) -> ModuleSpec | None:
if self.should_instrument(fullname):
spec = self._original_pathfinder.find_spec(fullname, path, target)
if spec is not None and isinstance(spec.loader, SourceFileLoader):
spec.loader = TypeguardLoader(spec.loader.name, spec.loader.path)
return spec
return None
def should_instrument(self, module_name: str) -> bool:
"""
Determine whether the module with the given name should be instrumented.
:param module_name: full name of the module that is about to be imported (e.g.
``xyz.abc``)
"""
if self.packages is None:
return True
for package in self.packages:
if module_name == package or module_name.startswith(package + "."):
return True
return False
class ImportHookManager:
"""
A handle that can be used to uninstall the Typeguard import hook.
"""
def __init__(self, hook: MetaPathFinder):
self.hook = hook
def __enter__(self) -> None:
pass
def __exit__(
self,
exc_type: type[BaseException],
exc_val: BaseException,
exc_tb: TracebackType,
) -> None:
self.uninstall()
def uninstall(self) -> None:
"""Uninstall the import hook."""
try:
sys.meta_path.remove(self.hook)
except ValueError:
pass # already removed
def install_import_hook(
packages: Iterable[str] | None = None,
*,
cls: type[TypeguardFinder] = TypeguardFinder,
) -> ImportHookManager:
"""
Install an import hook that instruments functions for automatic type checking.
This only affects modules loaded **after** this hook has been installed.
:param packages: an iterable of package names to instrument, or ``None`` to
instrument all packages
:param cls: a custom meta path finder class
:return: a context manager that uninstalls the hook on exit (or when you call
``.uninstall()``)
.. versionadded:: 2.6
"""
if packages is None:
target_packages: list[str] | None = None
elif isinstance(packages, str):
target_packages = [packages]
else:
target_packages = list(packages)
for finder in sys.meta_path:
if (
isclass(finder)
and finder.__name__ == "PathFinder"
and hasattr(finder, "find_spec")
):
break
else:
raise RuntimeError("Cannot find a PathFinder in sys.meta_path")
hook = cls(target_packages, finder)
sys.meta_path.insert(0, hook)
return ImportHookManager(hook)

48
lib/typeguard/_memo.py Normal file
View file

@ -0,0 +1,48 @@
from __future__ import annotations
from typing import Any
from typeguard._config import TypeCheckConfiguration, global_config
class TypeCheckMemo:
"""
Contains information necessary for type checkers to do their work.
.. attribute:: globals
:type: dict[str, Any]
Dictionary of global variables to use for resolving forward references.
.. attribute:: locals
:type: dict[str, Any]
Dictionary of local variables to use for resolving forward references.
.. attribute:: self_type
:type: type | None
When running type checks within an instance method or class method, this is the
class object that the first argument (usually named ``self`` or ``cls``) refers
to.
.. attribute:: config
:type: TypeCheckConfiguration
Contains the configuration for a particular set of type checking operations.
"""
__slots__ = "globals", "locals", "self_type", "config"
def __init__(
self,
globals: dict[str, Any],
locals: dict[str, Any],
*,
self_type: type | None = None,
config: TypeCheckConfiguration = global_config,
):
self.globals = globals
self.locals = locals
self.self_type = self_type
self.config = config

View file

@ -0,0 +1,126 @@
from __future__ import annotations
import sys
import warnings
from typing import Any, Literal
from pytest import Config, Parser
from typeguard._config import CollectionCheckStrategy, ForwardRefPolicy, global_config
from typeguard._exceptions import InstrumentationWarning
from typeguard._importhook import install_import_hook
from typeguard._utils import qualified_name, resolve_reference
def pytest_addoption(parser: Parser) -> None:
def add_ini_option(
opt_type: (
Literal["string", "paths", "pathlist", "args", "linelist", "bool"] | None
)
) -> None:
parser.addini(
group.options[-1].names()[0][2:],
group.options[-1].attrs()["help"],
opt_type,
)
group = parser.getgroup("typeguard")
group.addoption(
"--typeguard-packages",
action="store",
help="comma separated name list of packages and modules to instrument for "
"type checking, or :all: to instrument all modules loaded after typeguard",
)
add_ini_option("linelist")
group.addoption(
"--typeguard-debug-instrumentation",
action="store_true",
help="print all instrumented code to stderr",
)
add_ini_option("bool")
group.addoption(
"--typeguard-typecheck-fail-callback",
action="store",
help=(
"a module:varname (e.g. typeguard:warn_on_error) reference to a function "
"that is called (with the exception, and memo object as arguments) to "
"handle a TypeCheckError"
),
)
add_ini_option("string")
group.addoption(
"--typeguard-forward-ref-policy",
action="store",
choices=list(ForwardRefPolicy.__members__),
help=(
"determines how to deal with unresolveable forward references in type "
"annotations"
),
)
add_ini_option("string")
group.addoption(
"--typeguard-collection-check-strategy",
action="store",
choices=list(CollectionCheckStrategy.__members__),
help="determines how thoroughly to check collections (list, dict, etc)",
)
add_ini_option("string")
def pytest_configure(config: Config) -> None:
def getoption(name: str) -> Any:
return config.getoption(name.replace("-", "_")) or config.getini(name)
packages: list[str] | None = []
if packages_option := config.getoption("typeguard_packages"):
packages = [pkg.strip() for pkg in packages_option.split(",")]
elif packages_ini := config.getini("typeguard-packages"):
packages = packages_ini
if packages:
if packages == [":all:"]:
packages = None
else:
already_imported_packages = sorted(
package for package in packages if package in sys.modules
)
if already_imported_packages:
warnings.warn(
f"typeguard cannot check these packages because they are already "
f"imported: {', '.join(already_imported_packages)}",
InstrumentationWarning,
stacklevel=1,
)
install_import_hook(packages=packages)
debug_option = getoption("typeguard-debug-instrumentation")
if debug_option:
global_config.debug_instrumentation = True
fail_callback_option = getoption("typeguard-typecheck-fail-callback")
if fail_callback_option:
callback = resolve_reference(fail_callback_option)
if not callable(callback):
raise TypeError(
f"{fail_callback_option} ({qualified_name(callback.__class__)}) is not "
f"a callable"
)
global_config.typecheck_fail_callback = callback
forward_ref_policy_option = getoption("typeguard-forward-ref-policy")
if forward_ref_policy_option:
forward_ref_policy = ForwardRefPolicy.__members__[forward_ref_policy_option]
global_config.forward_ref_policy = forward_ref_policy
collection_check_strategy_option = getoption("typeguard-collection-check-strategy")
if collection_check_strategy_option:
collection_check_strategy = CollectionCheckStrategy.__members__[
collection_check_strategy_option
]
global_config.collection_check_strategy = collection_check_strategy

View file

@ -0,0 +1,86 @@
from __future__ import annotations
import sys
from collections.abc import Callable, Generator
from contextlib import contextmanager
from functools import update_wrapper
from threading import Lock
from typing import ContextManager, TypeVar, overload
if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
from typing_extensions import ParamSpec
P = ParamSpec("P")
T = TypeVar("T")
type_checks_suppressed = 0
type_checks_suppress_lock = Lock()
@overload
def suppress_type_checks(func: Callable[P, T]) -> Callable[P, T]: ...
@overload
def suppress_type_checks() -> ContextManager[None]: ...
def suppress_type_checks(
func: Callable[P, T] | None = None
) -> Callable[P, T] | ContextManager[None]:
"""
Temporarily suppress all type checking.
This function has two operating modes, based on how it's used:
#. as a context manager (``with suppress_type_checks(): ...``)
#. as a decorator (``@suppress_type_checks``)
When used as a context manager, :func:`check_type` and any automatically
instrumented functions skip the actual type checking. These context managers can be
nested.
When used as a decorator, all type checking is suppressed while the function is
running.
Type checking will resume once no more context managers are active and no decorated
functions are running.
Both operating modes are thread-safe.
"""
def wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
global type_checks_suppressed
with type_checks_suppress_lock:
type_checks_suppressed += 1
assert func is not None
try:
return func(*args, **kwargs)
finally:
with type_checks_suppress_lock:
type_checks_suppressed -= 1
def cm() -> Generator[None, None, None]:
global type_checks_suppressed
with type_checks_suppress_lock:
type_checks_suppressed += 1
try:
yield
finally:
with type_checks_suppress_lock:
type_checks_suppressed -= 1
if func is None:
# Context manager mode
return contextmanager(cm)()
else:
# Decorator mode
update_wrapper(wrapper, func)
return wrapper

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,55 @@
"""
Transforms lazily evaluated PEP 604 unions into typing.Unions, for compatibility with
Python versions older than 3.10.
"""
from __future__ import annotations
from ast import (
BinOp,
BitOr,
Index,
Load,
Name,
NodeTransformer,
Subscript,
fix_missing_locations,
parse,
)
from ast import Tuple as ASTTuple
from types import CodeType
from typing import Any, Dict, FrozenSet, List, Set, Tuple, Union
type_substitutions = {
"dict": Dict,
"list": List,
"tuple": Tuple,
"set": Set,
"frozenset": FrozenSet,
"Union": Union,
}
class UnionTransformer(NodeTransformer):
def __init__(self, union_name: Name | None = None):
self.union_name = union_name or Name(id="Union", ctx=Load())
def visit_BinOp(self, node: BinOp) -> Any:
self.generic_visit(node)
if isinstance(node.op, BitOr):
return Subscript(
value=self.union_name,
slice=Index(
ASTTuple(elts=[node.left, node.right], ctx=Load()), ctx=Load()
),
ctx=Load(),
)
return node
def compile_type_hint(hint: str) -> CodeType:
parsed = parse(hint, "<string>", "eval")
UnionTransformer().visit(parsed)
fix_missing_locations(parsed)
return compile(parsed, "<string>", "eval", flags=0)

163
lib/typeguard/_utils.py Normal file
View file

@ -0,0 +1,163 @@
from __future__ import annotations
import inspect
import sys
from importlib import import_module
from inspect import currentframe
from types import CodeType, FrameType, FunctionType
from typing import TYPE_CHECKING, Any, Callable, ForwardRef, Union, cast, final
from weakref import WeakValueDictionary
if TYPE_CHECKING:
from ._memo import TypeCheckMemo
if sys.version_info >= (3, 10):
from typing import get_args, get_origin
def evaluate_forwardref(forwardref: ForwardRef, memo: TypeCheckMemo) -> Any:
return forwardref._evaluate(memo.globals, memo.locals, frozenset())
else:
from typing_extensions import get_args, get_origin
evaluate_extra_args: tuple[frozenset[Any], ...] = (
(frozenset(),) if sys.version_info >= (3, 9) else ()
)
def evaluate_forwardref(forwardref: ForwardRef, memo: TypeCheckMemo) -> Any:
from ._union_transformer import compile_type_hint, type_substitutions
if not forwardref.__forward_evaluated__:
forwardref.__forward_code__ = compile_type_hint(forwardref.__forward_arg__)
try:
return forwardref._evaluate(memo.globals, memo.locals, *evaluate_extra_args)
except NameError:
if sys.version_info < (3, 10):
# Try again, with the type substitutions (list -> List etc.) in place
new_globals = memo.globals.copy()
new_globals.setdefault("Union", Union)
if sys.version_info < (3, 9):
new_globals.update(type_substitutions)
return forwardref._evaluate(
new_globals, memo.locals or new_globals, *evaluate_extra_args
)
raise
_functions_map: WeakValueDictionary[CodeType, FunctionType] = WeakValueDictionary()
def get_type_name(type_: Any) -> str:
name: str
for attrname in "__name__", "_name", "__forward_arg__":
candidate = getattr(type_, attrname, None)
if isinstance(candidate, str):
name = candidate
break
else:
origin = get_origin(type_)
candidate = getattr(origin, "_name", None)
if candidate is None:
candidate = type_.__class__.__name__.strip("_")
if isinstance(candidate, str):
name = candidate
else:
return "(unknown)"
args = get_args(type_)
if args:
if name == "Literal":
formatted_args = ", ".join(repr(arg) for arg in args)
else:
formatted_args = ", ".join(get_type_name(arg) for arg in args)
name += f"[{formatted_args}]"
module = getattr(type_, "__module__", None)
if module and module not in (None, "typing", "typing_extensions", "builtins"):
name = module + "." + name
return name
def qualified_name(obj: Any, *, add_class_prefix: bool = False) -> str:
"""
Return the qualified name (e.g. package.module.Type) for the given object.
Builtins and types from the :mod:`typing` package get special treatment by having
the module name stripped from the generated name.
"""
if obj is None:
return "None"
elif inspect.isclass(obj):
prefix = "class " if add_class_prefix else ""
type_ = obj
else:
prefix = ""
type_ = type(obj)
module = type_.__module__
qualname = type_.__qualname__
name = qualname if module in ("typing", "builtins") else f"{module}.{qualname}"
return prefix + name
def function_name(func: Callable[..., Any]) -> str:
"""
Return the qualified name of the given function.
Builtins and types from the :mod:`typing` package get special treatment by having
the module name stripped from the generated name.
"""
# For partial functions and objects with __call__ defined, __qualname__ does not
# exist
module = getattr(func, "__module__", "")
qualname = (module + ".") if module not in ("builtins", "") else ""
return qualname + getattr(func, "__qualname__", repr(func))
def resolve_reference(reference: str) -> Any:
modulename, varname = reference.partition(":")[::2]
if not modulename or not varname:
raise ValueError(f"{reference!r} is not a module:varname reference")
obj = import_module(modulename)
for attr in varname.split("."):
obj = getattr(obj, attr)
return obj
def is_method_of(obj: object, cls: type) -> bool:
return (
inspect.isfunction(obj)
and obj.__module__ == cls.__module__
and obj.__qualname__.startswith(cls.__qualname__ + ".")
)
def get_stacklevel() -> int:
level = 1
frame = cast(FrameType, currentframe()).f_back
while frame and frame.f_globals.get("__name__", "").startswith("typeguard."):
level += 1
frame = frame.f_back
return level
@final
class Unset:
__slots__ = ()
def __repr__(self) -> str:
return "<unset>"
unset = Unset()

0
lib/typeguard/py.typed Normal file
View file