diff --git a/lib/jaraco/functools.py b/lib/jaraco/functools.py index fcdbb4f9..6ae9e31d 100644 --- a/lib/jaraco/functools.py +++ b/lib/jaraco/functools.py @@ -4,6 +4,7 @@ import inspect import collections import types import itertools +import warnings import more_itertools @@ -266,11 +267,33 @@ def result_invoke(action): return wrap -def call_aside(f, *args, **kwargs): +def invoke(f, *args, **kwargs): """ Call a function for its side effect after initialization. - >>> @call_aside + The benefit of using the decorator instead of simply invoking a function + after defining it is that it makes explicit the author's intent for the + function to be called immediately. Whereas if one simply calls the + function immediately, it's less obvious if that was intentional or + incidental. It also avoids repeating the name - the two actions, defining + the function and calling it immediately are modeled separately, but linked + by the decorator construct. + + The benefit of having a function construct (opposed to just invoking some + behavior inline) is to serve as a scope in which the behavior occurs. It + avoids polluting the global namespace with local variables, provides an + anchor on which to attach documentation (docstring), keeps the behavior + logically separated (instead of conceptually separated or not separated at + all), and provides potential to re-use the behavior for testing or other + purposes. + + This function is named as a pithy way to communicate, "call this function + primarily for its side effect", or "while defining this function, also + take it aside and call it". It exists because there's no Python construct + for "define and call" (nor should there be, as decorators serve this need + just fine). The behavior happens immediately and synchronously. + + >>> @invoke ... def func(): print("called") called >>> func() @@ -278,7 +301,7 @@ def call_aside(f, *args, **kwargs): Use functools.partial to pass parameters to the initial call - >>> @functools.partial(call_aside, name='bingo') + >>> @functools.partial(invoke, name='bingo') ... def func(name): print("called with", name) called with bingo """ @@ -286,6 +309,14 @@ def call_aside(f, *args, **kwargs): return f +def call_aside(*args, **kwargs): + """ + Deprecated name for invoke. + """ + warnings.warn("call_aside is deprecated, use invoke", DeprecationWarning) + return invoke(*args, **kwargs) + + class Throttler: """ Rate-limit a function (or other callable) diff --git a/lib/more_itertools/__init__.py b/lib/more_itertools/__init__.py index 557bfc20..66443971 100644 --- a/lib/more_itertools/__init__.py +++ b/lib/more_itertools/__init__.py @@ -3,4 +3,4 @@ from .more import * # noqa from .recipes import * # noqa -__version__ = '9.0.0' +__version__ = '9.1.0' diff --git a/lib/more_itertools/more.py b/lib/more_itertools/more.py index 7f73d6ed..cc8b8b48 100755 --- a/lib/more_itertools/more.py +++ b/lib/more_itertools/more.py @@ -68,6 +68,7 @@ __all__ = [ 'exactly_n', 'filter_except', 'first', + 'gray_product', 'groupby_transform', 'ichunked', 'iequals', @@ -658,6 +659,7 @@ def distinct_permutations(iterable, r=None): [(0, 1), (0, 2), (1, 0), (1, 2), (2, 0), (2, 1)] """ + # Algorithm: https://w.wiki/Qai def _full(A): while True: @@ -1301,7 +1303,7 @@ def split_at(iterable, pred, maxsplit=-1, keep_separator=False): [[0], [2], [4, 5, 6, 7, 8, 9]] By default, the delimiting items are not included in the output. - The include them, set *keep_separator* to ``True``. + To include them, set *keep_separator* to ``True``. >>> list(split_at('abcdcba', lambda x: x == 'b', keep_separator=True)) [['a'], ['b'], ['c', 'd', 'c'], ['b'], ['a']] @@ -1391,7 +1393,9 @@ def split_after(iterable, pred, maxsplit=-1): if pred(item) and buf: yield buf if maxsplit == 1: - yield list(it) + buf = list(it) + if buf: + yield buf return buf = [] maxsplit -= 1 @@ -2914,6 +2918,7 @@ def make_decorator(wrapping_func, result_index=0): '7' """ + # See https://sites.google.com/site/bbayles/index/decorator_factory for # notes on how this works. def decorator(*wrapping_args, **wrapping_kwargs): @@ -3464,7 +3469,6 @@ def _sample_unweighted(iterable, k): next_index = k + floor(log(random()) / log(1 - W)) for index, element in enumerate(iterable, k): - if index == next_index: reservoir[randrange(k)] = element # The new W is the largest in a sample of k U(0, `old_W`) numbers @@ -4284,7 +4288,6 @@ def minmax(iterable_or_value, *others, key=None, default=_marker): lo_key = hi_key = key(lo) for x, y in zip_longest(it, it, fillvalue=lo): - x_key, y_key = key(x), key(y) if y_key < x_key: @@ -4345,3 +4348,45 @@ def constrained_batches( if batch: yield tuple(batch) + + +def gray_product(*iterables): + """Like :func:`itertools.product`, but return tuples in an order such + that only one element in the generated tuple changes from one iteration + to the next. + + >>> list(gray_product('AB','CD')) + [('A', 'C'), ('B', 'C'), ('B', 'D'), ('A', 'D')] + + This function consumes all of the input iterables before producing output. + If any of the input iterables have fewer than two items, ``ValueError`` + is raised. + + For information on the algorithm, see + `this section `__ + of Donald Knuth's *The Art of Computer Programming*. + """ + all_iterables = tuple(tuple(x) for x in iterables) + iterable_count = len(all_iterables) + for iterable in all_iterables: + if len(iterable) < 2: + raise ValueError("each iterable must have two or more items") + + # This is based on "Algorithm H" from section 7.2.1.1, page 20. + # a holds the indexes of the source iterables for the n-tuple to be yielded + # f is the array of "focus pointers" + # o is the array of "directions" + a = [0] * iterable_count + f = list(range(iterable_count + 1)) + o = [1] * iterable_count + while True: + yield tuple(all_iterables[i][a[i]] for i in range(iterable_count)) + j = f[0] + f[0] = 0 + if j == iterable_count: + break + a[j] = a[j] + o[j] + if a[j] == 0 or a[j] == len(all_iterables[j]) - 1: + o[j] = -o[j] + f[j] = f[j + 1] + f[j + 1] = j + 1 diff --git a/lib/more_itertools/more.pyi b/lib/more_itertools/more.pyi index 1413fae7..75c5232c 100644 --- a/lib/more_itertools/more.pyi +++ b/lib/more_itertools/more.pyi @@ -1,26 +1,25 @@ """Stubs for more_itertools.more""" +from __future__ import annotations +from types import TracebackType from typing import ( Any, Callable, Container, - Dict, + ContextManager, Generic, Hashable, Iterable, Iterator, - List, - Optional, + overload, Reversible, Sequence, Sized, - Tuple, - Union, + Type, TypeVar, type_check_only, ) -from types import TracebackType -from typing_extensions import ContextManager, Protocol, Type, overload +from typing_extensions import Protocol # Type and type variable definitions _T = TypeVar('_T') @@ -31,7 +30,7 @@ _V = TypeVar('_V') _W = TypeVar('_W') _T_co = TypeVar('_T_co', covariant=True) _GenFn = TypeVar('_GenFn', bound=Callable[..., Iterator[object]]) -_Raisable = Union[BaseException, 'Type[BaseException]'] +_Raisable = BaseException | Type[BaseException] @type_check_only class _SizedIterable(Protocol[_T_co], Sized, Iterable[_T_co]): ... @@ -39,23 +38,25 @@ class _SizedIterable(Protocol[_T_co], Sized, Iterable[_T_co]): ... @type_check_only class _SizedReversible(Protocol[_T_co], Sized, Reversible[_T_co]): ... +@type_check_only +class _SupportsSlicing(Protocol[_T_co]): + def __getitem__(self, __k: slice) -> _T_co: ... + def chunked( - iterable: Iterable[_T], n: Optional[int], strict: bool = ... -) -> Iterator[List[_T]]: ... + iterable: Iterable[_T], n: int | None, strict: bool = ... +) -> Iterator[list[_T]]: ... @overload def first(iterable: Iterable[_T]) -> _T: ... @overload -def first(iterable: Iterable[_T], default: _U) -> Union[_T, _U]: ... +def first(iterable: Iterable[_T], default: _U) -> _T | _U: ... @overload def last(iterable: Iterable[_T]) -> _T: ... @overload -def last(iterable: Iterable[_T], default: _U) -> Union[_T, _U]: ... +def last(iterable: Iterable[_T], default: _U) -> _T | _U: ... @overload def nth_or_last(iterable: Iterable[_T], n: int) -> _T: ... @overload -def nth_or_last( - iterable: Iterable[_T], n: int, default: _U -) -> Union[_T, _U]: ... +def nth_or_last(iterable: Iterable[_T], n: int, default: _U) -> _T | _U: ... class peekable(Generic[_T], Iterator[_T]): def __init__(self, iterable: Iterable[_T]) -> None: ... @@ -64,13 +65,13 @@ class peekable(Generic[_T], Iterator[_T]): @overload def peek(self) -> _T: ... @overload - def peek(self, default: _U) -> Union[_T, _U]: ... + def peek(self, default: _U) -> _T | _U: ... def prepend(self, *items: _T) -> None: ... def __next__(self) -> _T: ... @overload def __getitem__(self, index: int) -> _T: ... @overload - def __getitem__(self, index: slice) -> List[_T]: ... + def __getitem__(self, index: slice) -> list[_T]: ... def consumer(func: _GenFn) -> _GenFn: ... def ilen(iterable: Iterable[object]) -> int: ... @@ -80,42 +81,42 @@ def with_iter( ) -> Iterator[_T]: ... def one( iterable: Iterable[_T], - too_short: Optional[_Raisable] = ..., - too_long: Optional[_Raisable] = ..., + too_short: _Raisable | None = ..., + too_long: _Raisable | None = ..., ) -> _T: ... def raise_(exception: _Raisable, *args: Any) -> None: ... def strictly_n( iterable: Iterable[_T], n: int, - too_short: Optional[_GenFn] = ..., - too_long: Optional[_GenFn] = ..., -) -> List[_T]: ... + too_short: _GenFn | None = ..., + too_long: _GenFn | None = ..., +) -> list[_T]: ... def distinct_permutations( - iterable: Iterable[_T], r: Optional[int] = ... -) -> Iterator[Tuple[_T, ...]]: ... + iterable: Iterable[_T], r: int | None = ... +) -> Iterator[tuple[_T, ...]]: ... def intersperse( e: _U, iterable: Iterable[_T], n: int = ... -) -> Iterator[Union[_T, _U]]: ... -def unique_to_each(*iterables: Iterable[_T]) -> List[List[_T]]: ... +) -> Iterator[_T | _U]: ... +def unique_to_each(*iterables: Iterable[_T]) -> list[list[_T]]: ... @overload def windowed( seq: Iterable[_T], n: int, *, step: int = ... -) -> Iterator[Tuple[Optional[_T], ...]]: ... +) -> Iterator[tuple[_T | None, ...]]: ... @overload def windowed( seq: Iterable[_T], n: int, fillvalue: _U, step: int = ... -) -> Iterator[Tuple[Union[_T, _U], ...]]: ... -def substrings(iterable: Iterable[_T]) -> Iterator[Tuple[_T, ...]]: ... +) -> Iterator[tuple[_T | _U, ...]]: ... +def substrings(iterable: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ... def substrings_indexes( seq: Sequence[_T], reverse: bool = ... -) -> Iterator[Tuple[Sequence[_T], int, int]]: ... +) -> Iterator[tuple[Sequence[_T], int, int]]: ... class bucket(Generic[_T, _U], Container[_U]): def __init__( self, iterable: Iterable[_T], key: Callable[[_T], _U], - validator: Optional[Callable[[object], object]] = ..., + validator: Callable[[object], object] | None = ..., ) -> None: ... def __contains__(self, value: object) -> bool: ... def __iter__(self) -> Iterator[_U]: ... @@ -123,109 +124,105 @@ class bucket(Generic[_T, _U], Container[_U]): def spy( iterable: Iterable[_T], n: int = ... -) -> Tuple[List[_T], Iterator[_T]]: ... +) -> tuple[list[_T], Iterator[_T]]: ... def interleave(*iterables: Iterable[_T]) -> Iterator[_T]: ... def interleave_longest(*iterables: Iterable[_T]) -> Iterator[_T]: ... def interleave_evenly( - iterables: List[Iterable[_T]], lengths: Optional[List[int]] = ... + iterables: list[Iterable[_T]], lengths: list[int] | None = ... ) -> Iterator[_T]: ... def collapse( iterable: Iterable[Any], - base_type: Optional[type] = ..., - levels: Optional[int] = ..., + base_type: type | None = ..., + levels: int | None = ..., ) -> Iterator[Any]: ... @overload def side_effect( func: Callable[[_T], object], iterable: Iterable[_T], chunk_size: None = ..., - before: Optional[Callable[[], object]] = ..., - after: Optional[Callable[[], object]] = ..., + before: Callable[[], object] | None = ..., + after: Callable[[], object] | None = ..., ) -> Iterator[_T]: ... @overload def side_effect( - func: Callable[[List[_T]], object], + func: Callable[[list[_T]], object], iterable: Iterable[_T], chunk_size: int, - before: Optional[Callable[[], object]] = ..., - after: Optional[Callable[[], object]] = ..., + before: Callable[[], object] | None = ..., + after: Callable[[], object] | None = ..., ) -> Iterator[_T]: ... def sliced( - seq: Sequence[_T], n: int, strict: bool = ... -) -> Iterator[Sequence[_T]]: ... + seq: _SupportsSlicing[_T], n: int, strict: bool = ... +) -> Iterator[_T]: ... def split_at( iterable: Iterable[_T], pred: Callable[[_T], object], maxsplit: int = ..., keep_separator: bool = ..., -) -> Iterator[List[_T]]: ... +) -> Iterator[list[_T]]: ... def split_before( iterable: Iterable[_T], pred: Callable[[_T], object], maxsplit: int = ... -) -> Iterator[List[_T]]: ... +) -> Iterator[list[_T]]: ... def split_after( iterable: Iterable[_T], pred: Callable[[_T], object], maxsplit: int = ... -) -> Iterator[List[_T]]: ... +) -> Iterator[list[_T]]: ... def split_when( iterable: Iterable[_T], pred: Callable[[_T, _T], object], maxsplit: int = ..., -) -> Iterator[List[_T]]: ... +) -> Iterator[list[_T]]: ... def split_into( - iterable: Iterable[_T], sizes: Iterable[Optional[int]] -) -> Iterator[List[_T]]: ... + iterable: Iterable[_T], sizes: Iterable[int | None] +) -> Iterator[list[_T]]: ... @overload def padded( iterable: Iterable[_T], *, - n: Optional[int] = ..., + n: int | None = ..., next_multiple: bool = ..., -) -> Iterator[Optional[_T]]: ... +) -> Iterator[_T | None]: ... @overload def padded( iterable: Iterable[_T], fillvalue: _U, - n: Optional[int] = ..., + n: int | None = ..., next_multiple: bool = ..., -) -> Iterator[Union[_T, _U]]: ... +) -> Iterator[_T | _U]: ... @overload def repeat_last(iterable: Iterable[_T]) -> Iterator[_T]: ... @overload -def repeat_last( - iterable: Iterable[_T], default: _U -) -> Iterator[Union[_T, _U]]: ... -def distribute(n: int, iterable: Iterable[_T]) -> List[Iterator[_T]]: ... +def repeat_last(iterable: Iterable[_T], default: _U) -> Iterator[_T | _U]: ... +def distribute(n: int, iterable: Iterable[_T]) -> list[Iterator[_T]]: ... @overload def stagger( iterable: Iterable[_T], offsets: _SizedIterable[int] = ..., longest: bool = ..., -) -> Iterator[Tuple[Optional[_T], ...]]: ... +) -> Iterator[tuple[_T | None, ...]]: ... @overload def stagger( iterable: Iterable[_T], offsets: _SizedIterable[int] = ..., longest: bool = ..., fillvalue: _U = ..., -) -> Iterator[Tuple[Union[_T, _U], ...]]: ... +) -> Iterator[tuple[_T | _U, ...]]: ... class UnequalIterablesError(ValueError): - def __init__( - self, details: Optional[Tuple[int, int, int]] = ... - ) -> None: ... + def __init__(self, details: tuple[int, int, int] | None = ...) -> None: ... @overload -def zip_equal(__iter1: Iterable[_T1]) -> Iterator[Tuple[_T1]]: ... +def zip_equal(__iter1: Iterable[_T1]) -> Iterator[tuple[_T1]]: ... @overload def zip_equal( __iter1: Iterable[_T1], __iter2: Iterable[_T2] -) -> Iterator[Tuple[_T1, _T2]]: ... +) -> Iterator[tuple[_T1, _T2]]: ... @overload def zip_equal( __iter1: Iterable[_T], __iter2: Iterable[_T], __iter3: Iterable[_T], *iterables: Iterable[_T], -) -> Iterator[Tuple[_T, ...]]: ... +) -> Iterator[tuple[_T, ...]]: ... @overload def zip_offset( __iter1: Iterable[_T1], @@ -233,7 +230,7 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: None = None, -) -> Iterator[Tuple[Optional[_T1]]]: ... +) -> Iterator[tuple[_T1 | None]]: ... @overload def zip_offset( __iter1: Iterable[_T1], @@ -242,7 +239,7 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: None = None, -) -> Iterator[Tuple[Optional[_T1], Optional[_T2]]]: ... +) -> Iterator[tuple[_T1 | None, _T2 | None]]: ... @overload def zip_offset( __iter1: Iterable[_T], @@ -252,7 +249,7 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: None = None, -) -> Iterator[Tuple[Optional[_T], ...]]: ... +) -> Iterator[tuple[_T | None, ...]]: ... @overload def zip_offset( __iter1: Iterable[_T1], @@ -260,7 +257,7 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: _U, -) -> Iterator[Tuple[Union[_T1, _U]]]: ... +) -> Iterator[tuple[_T1 | _U]]: ... @overload def zip_offset( __iter1: Iterable[_T1], @@ -269,7 +266,7 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: _U, -) -> Iterator[Tuple[Union[_T1, _U], Union[_T2, _U]]]: ... +) -> Iterator[tuple[_T1 | _U, _T2 | _U]]: ... @overload def zip_offset( __iter1: Iterable[_T], @@ -279,82 +276,80 @@ def zip_offset( offsets: _SizedIterable[int], longest: bool = ..., fillvalue: _U, -) -> Iterator[Tuple[Union[_T, _U], ...]]: ... +) -> Iterator[tuple[_T | _U, ...]]: ... def sort_together( iterables: Iterable[Iterable[_T]], key_list: Iterable[int] = ..., - key: Optional[Callable[..., Any]] = ..., + key: Callable[..., Any] | None = ..., reverse: bool = ..., -) -> List[Tuple[_T, ...]]: ... -def unzip(iterable: Iterable[Sequence[_T]]) -> Tuple[Iterator[_T], ...]: ... -def divide(n: int, iterable: Iterable[_T]) -> List[Iterator[_T]]: ... +) -> list[tuple[_T, ...]]: ... +def unzip(iterable: Iterable[Sequence[_T]]) -> tuple[Iterator[_T], ...]: ... +def divide(n: int, iterable: Iterable[_T]) -> list[Iterator[_T]]: ... def always_iterable( obj: object, - base_type: Union[ - type, Tuple[Union[type, Tuple[Any, ...]], ...], None - ] = ..., + base_type: type | tuple[type | tuple[Any, ...], ...] | None = ..., ) -> Iterator[Any]: ... def adjacent( predicate: Callable[[_T], bool], iterable: Iterable[_T], distance: int = ..., -) -> Iterator[Tuple[bool, _T]]: ... +) -> Iterator[tuple[bool, _T]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: None = None, valuefunc: None = None, reducefunc: None = None, -) -> Iterator[Tuple[_T, Iterator[_T]]]: ... +) -> Iterator[tuple[_T, Iterator[_T]]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: None, reducefunc: None, -) -> Iterator[Tuple[_U, Iterator[_T]]]: ... +) -> Iterator[tuple[_U, Iterator[_T]]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: None, valuefunc: Callable[[_T], _V], reducefunc: None, -) -> Iterable[Tuple[_T, Iterable[_V]]]: ... +) -> Iterable[tuple[_T, Iterable[_V]]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: Callable[[_T], _V], reducefunc: None, -) -> Iterable[Tuple[_U, Iterator[_V]]]: ... +) -> Iterable[tuple[_U, Iterator[_V]]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: None, valuefunc: None, reducefunc: Callable[[Iterator[_T]], _W], -) -> Iterable[Tuple[_T, _W]]: ... +) -> Iterable[tuple[_T, _W]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: None, reducefunc: Callable[[Iterator[_T]], _W], -) -> Iterable[Tuple[_U, _W]]: ... +) -> Iterable[tuple[_U, _W]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: None, valuefunc: Callable[[_T], _V], reducefunc: Callable[[Iterable[_V]], _W], -) -> Iterable[Tuple[_T, _W]]: ... +) -> Iterable[tuple[_T, _W]]: ... @overload def groupby_transform( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: Callable[[_T], _V], reducefunc: Callable[[Iterable[_V]], _W], -) -> Iterable[Tuple[_U, _W]]: ... +) -> Iterable[tuple[_U, _W]]: ... class numeric_range(Generic[_T, _U], Sequence[_T], Hashable, Reversible[_T]): @overload @@ -375,22 +370,22 @@ class numeric_range(Generic[_T, _U], Sequence[_T], Hashable, Reversible[_T]): def __len__(self) -> int: ... def __reduce__( self, - ) -> Tuple[Type[numeric_range[_T, _U]], Tuple[_T, _T, _U]]: ... + ) -> tuple[Type[numeric_range[_T, _U]], tuple[_T, _T, _U]]: ... def __repr__(self) -> str: ... def __reversed__(self) -> Iterator[_T]: ... def count(self, value: _T) -> int: ... def index(self, value: _T) -> int: ... # type: ignore def count_cycle( - iterable: Iterable[_T], n: Optional[int] = ... -) -> Iterable[Tuple[int, _T]]: ... + iterable: Iterable[_T], n: int | None = ... +) -> Iterable[tuple[int, _T]]: ... def mark_ends( iterable: Iterable[_T], -) -> Iterable[Tuple[bool, bool, _T]]: ... +) -> Iterable[tuple[bool, bool, _T]]: ... def locate( iterable: Iterable[object], pred: Callable[..., Any] = ..., - window_size: Optional[int] = ..., + window_size: int | None = ..., ) -> Iterator[int]: ... def lstrip( iterable: Iterable[_T], pred: Callable[[_T], object] @@ -403,9 +398,7 @@ def strip( ) -> Iterator[_T]: ... class islice_extended(Generic[_T], Iterator[_T]): - def __init__( - self, iterable: Iterable[_T], *args: Optional[int] - ) -> None: ... + def __init__(self, iterable: Iterable[_T], *args: int | None) -> None: ... def __iter__(self) -> islice_extended[_T]: ... def __next__(self) -> _T: ... def __getitem__(self, index: slice) -> islice_extended[_T]: ... @@ -420,7 +413,7 @@ def difference( func: Callable[[_T, _T], _U] = ..., *, initial: None = ..., -) -> Iterator[Union[_T, _U]]: ... +) -> Iterator[_T | _U]: ... @overload def difference( iterable: Iterable[_T], func: Callable[[_T, _T], _U] = ..., *, initial: _U @@ -436,7 +429,7 @@ class SequenceView(Generic[_T], Sequence[_T]): class seekable(Generic[_T], Iterator[_T]): def __init__( - self, iterable: Iterable[_T], maxlen: Optional[int] = ... + self, iterable: Iterable[_T], maxlen: int | None = ... ) -> None: ... def __iter__(self) -> seekable[_T]: ... def __next__(self) -> _T: ... @@ -444,20 +437,20 @@ class seekable(Generic[_T], Iterator[_T]): @overload def peek(self) -> _T: ... @overload - def peek(self, default: _U) -> Union[_T, _U]: ... + def peek(self, default: _U) -> _T | _U: ... def elements(self) -> SequenceView[_T]: ... def seek(self, index: int) -> None: ... class run_length: @staticmethod - def encode(iterable: Iterable[_T]) -> Iterator[Tuple[_T, int]]: ... + def encode(iterable: Iterable[_T]) -> Iterator[tuple[_T, int]]: ... @staticmethod - def decode(iterable: Iterable[Tuple[_T, int]]) -> Iterator[_T]: ... + def decode(iterable: Iterable[tuple[_T, int]]) -> Iterator[_T]: ... def exactly_n( iterable: Iterable[_T], n: int, predicate: Callable[[_T], object] = ... ) -> bool: ... -def circular_shifts(iterable: Iterable[_T]) -> List[Tuple[_T, ...]]: ... +def circular_shifts(iterable: Iterable[_T]) -> list[tuple[_T, ...]]: ... def make_decorator( wrapping_func: Callable[..., _U], result_index: int = ... ) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., _U]]]: ... @@ -467,44 +460,44 @@ def map_reduce( keyfunc: Callable[[_T], _U], valuefunc: None = ..., reducefunc: None = ..., -) -> Dict[_U, List[_T]]: ... +) -> dict[_U, list[_T]]: ... @overload def map_reduce( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: Callable[[_T], _V], reducefunc: None = ..., -) -> Dict[_U, List[_V]]: ... +) -> dict[_U, list[_V]]: ... @overload def map_reduce( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: None = ..., - reducefunc: Callable[[List[_T]], _W] = ..., -) -> Dict[_U, _W]: ... + reducefunc: Callable[[list[_T]], _W] = ..., +) -> dict[_U, _W]: ... @overload def map_reduce( iterable: Iterable[_T], keyfunc: Callable[[_T], _U], valuefunc: Callable[[_T], _V], - reducefunc: Callable[[List[_V]], _W], -) -> Dict[_U, _W]: ... + reducefunc: Callable[[list[_V]], _W], +) -> dict[_U, _W]: ... def rlocate( iterable: Iterable[_T], pred: Callable[..., object] = ..., - window_size: Optional[int] = ..., + window_size: int | None = ..., ) -> Iterator[int]: ... def replace( iterable: Iterable[_T], pred: Callable[..., object], substitutes: Iterable[_U], - count: Optional[int] = ..., + count: int | None = ..., window_size: int = ..., -) -> Iterator[Union[_T, _U]]: ... -def partitions(iterable: Iterable[_T]) -> Iterator[List[List[_T]]]: ... +) -> Iterator[_T | _U]: ... +def partitions(iterable: Iterable[_T]) -> Iterator[list[list[_T]]]: ... def set_partitions( - iterable: Iterable[_T], k: Optional[int] = ... -) -> Iterator[List[List[_T]]]: ... + iterable: Iterable[_T], k: int | None = ... +) -> Iterator[list[list[_T]]]: ... class time_limited(Generic[_T], Iterator[_T]): def __init__( @@ -515,16 +508,16 @@ class time_limited(Generic[_T], Iterator[_T]): @overload def only( - iterable: Iterable[_T], *, too_long: Optional[_Raisable] = ... -) -> Optional[_T]: ... + iterable: Iterable[_T], *, too_long: _Raisable | None = ... +) -> _T | None: ... @overload def only( - iterable: Iterable[_T], default: _U, too_long: Optional[_Raisable] = ... -) -> Union[_T, _U]: ... + iterable: Iterable[_T], default: _U, too_long: _Raisable | None = ... +) -> _T | _U: ... def ichunked(iterable: Iterable[_T], n: int) -> Iterator[Iterator[_T]]: ... def distinct_combinations( iterable: Iterable[_T], r: int -) -> Iterator[Tuple[_T, ...]]: ... +) -> Iterator[tuple[_T, ...]]: ... def filter_except( validator: Callable[[Any], object], iterable: Iterable[_T], @@ -539,16 +532,16 @@ def map_if( iterable: Iterable[Any], pred: Callable[[Any], bool], func: Callable[[Any], Any], - func_else: Optional[Callable[[Any], Any]] = ..., + func_else: Callable[[Any], Any] | None = ..., ) -> Iterator[Any]: ... def sample( iterable: Iterable[_T], k: int, - weights: Optional[Iterable[float]] = ..., -) -> List[_T]: ... + weights: Iterable[float] | None = ..., +) -> list[_T]: ... def is_sorted( iterable: Iterable[_T], - key: Optional[Callable[[_T], _U]] = ..., + key: Callable[[_T], _U] | None = ..., reverse: bool = False, strict: bool = False, ) -> bool: ... @@ -566,10 +559,10 @@ class callback_iter(Generic[_T], Iterator[_T]): def __enter__(self) -> callback_iter[_T]: ... def __exit__( self, - exc_type: Optional[Type[BaseException]], - exc_value: Optional[BaseException], - traceback: Optional[TracebackType], - ) -> Optional[bool]: ... + exc_type: Type[BaseException] | None, + exc_value: BaseException | None, + traceback: TracebackType | None, + ) -> bool | None: ... def __iter__(self) -> callback_iter[_T]: ... def __next__(self) -> _T: ... def _reader(self) -> Iterator[_T]: ... @@ -580,15 +573,15 @@ class callback_iter(Generic[_T], Iterator[_T]): def windowed_complete( iterable: Iterable[_T], n: int -) -> Iterator[Tuple[_T, ...]]: ... +) -> Iterator[tuple[_T, ...]]: ... def all_unique( - iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ... + iterable: Iterable[_T], key: Callable[[_T], _U] | None = ... ) -> bool: ... -def nth_product(index: int, *args: Iterable[_T]) -> Tuple[_T, ...]: ... +def nth_product(index: int, *args: Iterable[_T]) -> tuple[_T, ...]: ... def nth_permutation( iterable: Iterable[_T], r: int, index: int -) -> Tuple[_T, ...]: ... -def value_chain(*args: Union[_T, Iterable[_T]]) -> Iterable[_T]: ... +) -> tuple[_T, ...]: ... +def value_chain(*args: _T | Iterable[_T]) -> Iterable[_T]: ... def product_index(element: Iterable[_T], *args: Iterable[_T]) -> int: ... def combination_index( element: Iterable[_T], iterable: Iterable[_T] @@ -603,22 +596,20 @@ class countable(Generic[_T], Iterator[_T]): def __iter__(self) -> countable[_T]: ... def __next__(self) -> _T: ... -def chunked_even(iterable: Iterable[_T], n: int) -> Iterator[List[_T]]: ... +def chunked_even(iterable: Iterable[_T], n: int) -> Iterator[list[_T]]: ... def zip_broadcast( - *objects: Union[_T, Iterable[_T]], - scalar_types: Union[ - type, Tuple[Union[type, Tuple[Any, ...]], ...], None - ] = ..., + *objects: _T | Iterable[_T], + scalar_types: type | tuple[type | tuple[Any, ...], ...] | None = ..., strict: bool = ..., -) -> Iterable[Tuple[_T, ...]]: ... +) -> Iterable[tuple[_T, ...]]: ... def unique_in_window( - iterable: Iterable[_T], n: int, key: Optional[Callable[[_T], _U]] = ... + iterable: Iterable[_T], n: int, key: Callable[[_T], _U] | None = ... ) -> Iterator[_T]: ... def duplicates_everseen( - iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ... + iterable: Iterable[_T], key: Callable[[_T], _U] | None = ... ) -> Iterator[_T]: ... def duplicates_justseen( - iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ... + iterable: Iterable[_T], key: Callable[[_T], _U] | None = ... ) -> Iterator[_T]: ... class _SupportsLessThan(Protocol): @@ -629,38 +620,38 @@ _SupportsLessThanT = TypeVar("_SupportsLessThanT", bound=_SupportsLessThan) @overload def minmax( iterable_or_value: Iterable[_SupportsLessThanT], *, key: None = None -) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ... +) -> tuple[_SupportsLessThanT, _SupportsLessThanT]: ... @overload def minmax( iterable_or_value: Iterable[_T], *, key: Callable[[_T], _SupportsLessThan] -) -> Tuple[_T, _T]: ... +) -> tuple[_T, _T]: ... @overload def minmax( iterable_or_value: Iterable[_SupportsLessThanT], *, key: None = None, default: _U, -) -> Union[_U, Tuple[_SupportsLessThanT, _SupportsLessThanT]]: ... +) -> _U | tuple[_SupportsLessThanT, _SupportsLessThanT]: ... @overload def minmax( iterable_or_value: Iterable[_T], *, key: Callable[[_T], _SupportsLessThan], default: _U, -) -> Union[_U, Tuple[_T, _T]]: ... +) -> _U | tuple[_T, _T]: ... @overload def minmax( iterable_or_value: _SupportsLessThanT, __other: _SupportsLessThanT, *others: _SupportsLessThanT, -) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ... +) -> tuple[_SupportsLessThanT, _SupportsLessThanT]: ... @overload def minmax( iterable_or_value: _T, __other: _T, *others: _T, key: Callable[[_T], _SupportsLessThan], -) -> Tuple[_T, _T]: ... +) -> tuple[_T, _T]: ... def longest_common_prefix( iterables: Iterable[Iterable[_T]], ) -> Iterator[_T]: ... @@ -668,7 +659,8 @@ def iequals(*iterables: Iterable[object]) -> bool: ... def constrained_batches( iterable: Iterable[object], max_size: int, - max_count: Optional[int] = ..., + max_count: int | None = ..., get_len: Callable[[_T], object] = ..., strict: bool = ..., -) -> Iterator[Tuple[_T]]: ... +) -> Iterator[tuple[_T]]: ... +def gray_product(*iterables: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ... diff --git a/lib/more_itertools/recipes.py b/lib/more_itertools/recipes.py index 85796207..3facc2e3 100644 --- a/lib/more_itertools/recipes.py +++ b/lib/more_itertools/recipes.py @@ -9,6 +9,7 @@ Some backward-compatible usability improvements have been made. """ import math import operator +import warnings from collections import deque from collections.abc import Sized @@ -21,12 +22,14 @@ from itertools import ( cycle, groupby, islice, + product, repeat, starmap, tee, zip_longest, ) from random import randrange, sample, choice +from sys import hexversion __all__ = [ 'all_equal', @@ -36,9 +39,12 @@ __all__ = [ 'convolve', 'dotproduct', 'first_true', + 'factor', 'flatten', 'grouper', 'iter_except', + 'iter_index', + 'matmul', 'ncycles', 'nth', 'nth_combination', @@ -62,6 +68,7 @@ __all__ = [ 'tabulate', 'tail', 'take', + 'transpose', 'triplewise', 'unique_everseen', 'unique_justseen', @@ -808,6 +815,35 @@ def polynomial_from_roots(roots): ] +def iter_index(iterable, value, start=0): + """Yield the index of each place in *iterable* that *value* occurs, + beginning with index *start*. + + See :func:`locate` for a more general means of finding the indexes + associated with particular values. + + >>> list(iter_index('AABCADEAF', 'A')) + [0, 1, 4, 7] + """ + try: + seq_index = iterable.index + except AttributeError: + # Slow path for general iterables + it = islice(iterable, start, None) + for i, element in enumerate(it, start): + if element is value or element == value: + yield i + else: + # Fast path for sequences + i = start - 1 + try: + while True: + i = seq_index(value, i + 1) + yield i + except ValueError: + pass + + def sieve(n): """Yield the primes less than n. @@ -815,13 +851,13 @@ def sieve(n): [2, 3, 5, 7, 11, 13, 17, 19, 23, 29] """ isqrt = getattr(math, 'isqrt', lambda x: int(math.sqrt(x))) + data = bytearray((0, 1)) * (n // 2) + data[:3] = 0, 0, 0 limit = isqrt(n) + 1 - data = bytearray([1]) * n - data[:2] = 0, 0 for p in compress(range(limit), data): - data[p + p : n : p] = bytearray(len(range(p + p, n, p))) - - return compress(count(), data) + data[p * p : n : p + p] = bytes(len(range(p * p, n, p + p))) + data[2] = 1 + return iter_index(data, 1) if n > 2 else iter([]) def batched(iterable, n): @@ -833,9 +869,62 @@ def batched(iterable, n): This recipe is from the ``itertools`` docs. This library also provides :func:`chunked`, which has a different implementation. """ + if hexversion >= 0x30C00A0: # Python 3.12.0a0 + warnings.warn( + ( + 'batched will be removed in a future version of ' + 'more-itertools. Use the standard library ' + 'itertools.batched function instead' + ), + DeprecationWarning, + ) + it = iter(iterable) while True: batch = list(islice(it, n)) if not batch: break yield batch + + +def transpose(it): + """Swap the rows and columns of the input. + + >>> list(transpose([(1, 2, 3), (11, 22, 33)])) + [(1, 11), (2, 22), (3, 33)] + + The caller should ensure that the dimensions of the input are compatible. + """ + # TODO: when 3.9 goes end-of-life, add stric=True to this. + return zip(*it) + + +def matmul(m1, m2): + """Multiply two matrices. + >>> list(matmul([(7, 5), (3, 5)], [(2, 5), (7, 9)])) + [[49, 80], [41, 60]] + + The caller should ensure that the dimensions of the input matrices are + compatible with each other. + """ + n = len(m2[0]) + return batched(starmap(dotproduct, product(m1, transpose(m2))), n) + + +def factor(n): + """Yield the prime factors of n. + >>> list(factor(360)) + [2, 2, 2, 3, 3, 5] + """ + isqrt = getattr(math, 'isqrt', lambda x: int(math.sqrt(x))) + for prime in sieve(isqrt(n) + 1): + while True: + quotient, remainder = divmod(n, prime) + if remainder: + break + yield prime + n = quotient + if n == 1: + return + if n >= 2: + yield n diff --git a/lib/more_itertools/recipes.pyi b/lib/more_itertools/recipes.pyi index 29415c5a..0267ed56 100644 --- a/lib/more_itertools/recipes.pyi +++ b/lib/more_itertools/recipes.pyi @@ -1,110 +1,119 @@ """Stubs for more_itertools.recipes""" +from __future__ import annotations + from typing import ( Any, Callable, Iterable, Iterator, - List, - Optional, + overload, Sequence, - Tuple, + Type, TypeVar, - Union, ) -from typing_extensions import overload, Type # Type and type variable definitions _T = TypeVar('_T') _U = TypeVar('_U') -def take(n: int, iterable: Iterable[_T]) -> List[_T]: ... +def take(n: int, iterable: Iterable[_T]) -> list[_T]: ... def tabulate( function: Callable[[int], _T], start: int = ... ) -> Iterator[_T]: ... def tail(n: int, iterable: Iterable[_T]) -> Iterator[_T]: ... -def consume(iterator: Iterable[object], n: Optional[int] = ...) -> None: ... +def consume(iterator: Iterable[object], n: int | None = ...) -> None: ... @overload -def nth(iterable: Iterable[_T], n: int) -> Optional[_T]: ... +def nth(iterable: Iterable[_T], n: int) -> _T | None: ... @overload -def nth(iterable: Iterable[_T], n: int, default: _U) -> Union[_T, _U]: ... +def nth(iterable: Iterable[_T], n: int, default: _U) -> _T | _U: ... def all_equal(iterable: Iterable[object]) -> bool: ... def quantify( iterable: Iterable[_T], pred: Callable[[_T], bool] = ... ) -> int: ... -def pad_none(iterable: Iterable[_T]) -> Iterator[Optional[_T]]: ... -def padnone(iterable: Iterable[_T]) -> Iterator[Optional[_T]]: ... +def pad_none(iterable: Iterable[_T]) -> Iterator[_T | None]: ... +def padnone(iterable: Iterable[_T]) -> Iterator[_T | None]: ... def ncycles(iterable: Iterable[_T], n: int) -> Iterator[_T]: ... def dotproduct(vec1: Iterable[object], vec2: Iterable[object]) -> object: ... def flatten(listOfLists: Iterable[Iterable[_T]]) -> Iterator[_T]: ... def repeatfunc( - func: Callable[..., _U], times: Optional[int] = ..., *args: Any + func: Callable[..., _U], times: int | None = ..., *args: Any ) -> Iterator[_U]: ... -def pairwise(iterable: Iterable[_T]) -> Iterator[Tuple[_T, _T]]: ... +def pairwise(iterable: Iterable[_T]) -> Iterator[tuple[_T, _T]]: ... def grouper( iterable: Iterable[_T], n: int, incomplete: str = ..., fillvalue: _U = ..., -) -> Iterator[Tuple[Union[_T, _U], ...]]: ... +) -> Iterator[tuple[_T | _U, ...]]: ... def roundrobin(*iterables: Iterable[_T]) -> Iterator[_T]: ... def partition( - pred: Optional[Callable[[_T], object]], iterable: Iterable[_T] -) -> Tuple[Iterator[_T], Iterator[_T]]: ... -def powerset(iterable: Iterable[_T]) -> Iterator[Tuple[_T, ...]]: ... + pred: Callable[[_T], object] | None, iterable: Iterable[_T] +) -> tuple[Iterator[_T], Iterator[_T]]: ... +def powerset(iterable: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ... def unique_everseen( - iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ... + iterable: Iterable[_T], key: Callable[[_T], _U] | None = ... ) -> Iterator[_T]: ... def unique_justseen( - iterable: Iterable[_T], key: Optional[Callable[[_T], object]] = ... + iterable: Iterable[_T], key: Callable[[_T], object] | None = ... ) -> Iterator[_T]: ... @overload def iter_except( func: Callable[[], _T], - exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]], + exception: Type[BaseException] | tuple[Type[BaseException], ...], first: None = ..., ) -> Iterator[_T]: ... @overload def iter_except( func: Callable[[], _T], - exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]], + exception: Type[BaseException] | tuple[Type[BaseException], ...], first: Callable[[], _U], -) -> Iterator[Union[_T, _U]]: ... +) -> Iterator[_T | _U]: ... @overload def first_true( - iterable: Iterable[_T], *, pred: Optional[Callable[[_T], object]] = ... -) -> Optional[_T]: ... + iterable: Iterable[_T], *, pred: Callable[[_T], object] | None = ... +) -> _T | None: ... @overload def first_true( iterable: Iterable[_T], default: _U, - pred: Optional[Callable[[_T], object]] = ..., -) -> Union[_T, _U]: ... + pred: Callable[[_T], object] | None = ..., +) -> _T | _U: ... def random_product( *args: Iterable[_T], repeat: int = ... -) -> Tuple[_T, ...]: ... +) -> tuple[_T, ...]: ... def random_permutation( - iterable: Iterable[_T], r: Optional[int] = ... -) -> Tuple[_T, ...]: ... -def random_combination(iterable: Iterable[_T], r: int) -> Tuple[_T, ...]: ... + iterable: Iterable[_T], r: int | None = ... +) -> tuple[_T, ...]: ... +def random_combination(iterable: Iterable[_T], r: int) -> tuple[_T, ...]: ... def random_combination_with_replacement( iterable: Iterable[_T], r: int -) -> Tuple[_T, ...]: ... +) -> tuple[_T, ...]: ... def nth_combination( iterable: Iterable[_T], r: int, index: int -) -> Tuple[_T, ...]: ... -def prepend(value: _T, iterator: Iterable[_U]) -> Iterator[Union[_T, _U]]: ... +) -> tuple[_T, ...]: ... +def prepend(value: _T, iterator: Iterable[_U]) -> Iterator[_T | _U]: ... def convolve(signal: Iterable[_T], kernel: Iterable[_T]) -> Iterator[_T]: ... def before_and_after( predicate: Callable[[_T], bool], it: Iterable[_T] -) -> Tuple[Iterator[_T], Iterator[_T]]: ... -def triplewise(iterable: Iterable[_T]) -> Iterator[Tuple[_T, _T, _T]]: ... +) -> tuple[Iterator[_T], Iterator[_T]]: ... +def triplewise(iterable: Iterable[_T]) -> Iterator[tuple[_T, _T, _T]]: ... def sliding_window( iterable: Iterable[_T], n: int -) -> Iterator[Tuple[_T, ...]]: ... -def subslices(iterable: Iterable[_T]) -> Iterator[List[_T]]: ... -def polynomial_from_roots(roots: Sequence[int]) -> List[int]: ... +) -> Iterator[tuple[_T, ...]]: ... +def subslices(iterable: Iterable[_T]) -> Iterator[list[_T]]: ... +def polynomial_from_roots(roots: Sequence[int]) -> list[int]: ... +def iter_index( + iterable: Iterable[object], + value: Any, + start: int | None = ..., +) -> Iterator[int]: ... def sieve(n: int) -> Iterator[int]: ... def batched( iterable: Iterable[_T], n: int, -) -> Iterator[List[_T]]: ... +) -> Iterator[list[_T]]: ... +def transpose( + it: Iterable[Iterable[_T]], +) -> tuple[Iterator[_T], ...]: ... +def matmul(m1: Sequence[_T], m2: Sequence[_T]) -> Iterator[list[_T]]: ... +def factor(n: int) -> Iterator[int]: ... diff --git a/lib/tempora/__init__.py b/lib/tempora/__init__.py index cece8bb7..27d5e828 100644 --- a/lib/tempora/__init__.py +++ b/lib/tempora/__init__.py @@ -11,9 +11,6 @@ from typing import Union, Tuple, Iterable from typing import cast -from jaraco.functools import once - - # some useful constants osc_per_year = 290_091_329_207_984_000 """ @@ -36,7 +33,7 @@ seconds_per_month = seconds_per_year / 12 hours_per_month = hours_per_day * days_per_year / 12 -@once +@functools.lru_cache() def _needs_year_help() -> bool: """ Some versions of Python render %Y with only three characters :( diff --git a/lib/tempora/tests/test_schedule.py b/lib/tempora/tests/test_schedule.py deleted file mode 100644 index 0ce35435..00000000 --- a/lib/tempora/tests/test_schedule.py +++ /dev/null @@ -1,149 +0,0 @@ -import time -import random -import datetime -from unittest import mock - -import pytest -import pytz -import freezegun - -from tempora import schedule - - -do_nothing = type(None) - - -def test_delayed_command_order(): - """ - delayed commands should be sorted by delay time - """ - delays = [random.randint(0, 99) for x in range(5)] - cmds = sorted( - [schedule.DelayedCommand.after(delay, do_nothing) for delay in delays] - ) - assert [c.delay.seconds for c in cmds] == sorted(delays) - - -def test_periodic_command_delay(): - "A PeriodicCommand must have a positive, non-zero delay." - with pytest.raises(ValueError) as exc_info: - schedule.PeriodicCommand.after(0, None) - assert str(exc_info.value) == test_periodic_command_delay.__doc__ - - -def test_periodic_command_fixed_delay(): - """ - Test that we can construct a periodic command with a fixed initial - delay. - """ - fd = schedule.PeriodicCommandFixedDelay.at_time( - at=schedule.now(), delay=datetime.timedelta(seconds=2), target=lambda: None - ) - assert fd.due() is True - assert fd.next().due() is False - - -class TestCommands: - def test_delayed_command_from_timestamp(self): - """ - Ensure a delayed command can be constructed from a timestamp. - """ - t = time.time() - schedule.DelayedCommand.at_time(t, do_nothing) - - def test_command_at_noon(self): - """ - Create a periodic command that's run at noon every day. - """ - when = datetime.time(12, 0, tzinfo=pytz.utc) - cmd = schedule.PeriodicCommandFixedDelay.daily_at(when, target=None) - assert cmd.due() is False - next_cmd = cmd.next() - daily = datetime.timedelta(days=1) - day_from_now = schedule.now() + daily - two_days_from_now = day_from_now + daily - assert day_from_now < next_cmd < two_days_from_now - - @pytest.mark.parametrize("hour", range(10, 14)) - @pytest.mark.parametrize("tz_offset", (14, -14)) - def test_command_at_noon_distant_local(self, hour, tz_offset): - """ - Run test_command_at_noon, but with the local timezone - more than 12 hours away from UTC. - """ - with freezegun.freeze_time(f"2020-01-10 {hour:02}:01", tz_offset=tz_offset): - self.test_command_at_noon() - - -class TestTimezones: - def test_alternate_timezone_west(self): - target_tz = pytz.timezone('US/Pacific') - target = schedule.now().astimezone(target_tz) - cmd = schedule.DelayedCommand.at_time(target, target=None) - assert cmd.due() - - def test_alternate_timezone_east(self): - target_tz = pytz.timezone('Europe/Amsterdam') - target = schedule.now().astimezone(target_tz) - cmd = schedule.DelayedCommand.at_time(target, target=None) - assert cmd.due() - - def test_daylight_savings(self): - """ - A command at 9am should always be 9am regardless of - a DST boundary. - """ - with freezegun.freeze_time('2018-03-10 08:00:00'): - target_tz = pytz.timezone('US/Eastern') - target_time = datetime.time(9, tzinfo=target_tz) - cmd = schedule.PeriodicCommandFixedDelay.daily_at( - target_time, target=lambda: None - ) - - def naive(dt): - return dt.replace(tzinfo=None) - - assert naive(cmd) == datetime.datetime(2018, 3, 10, 9, 0, 0) - next_ = cmd.next() - assert naive(next_) == datetime.datetime(2018, 3, 11, 9, 0, 0) - assert next_ - cmd == datetime.timedelta(hours=23) - - -class TestScheduler: - def test_invoke_scheduler(self): - sched = schedule.InvokeScheduler() - target = mock.MagicMock() - cmd = schedule.DelayedCommand.after(0, target) - sched.add(cmd) - sched.run_pending() - target.assert_called_once() - assert not sched.queue - - def test_callback_scheduler(self): - callback = mock.MagicMock() - sched = schedule.CallbackScheduler(callback) - target = mock.MagicMock() - cmd = schedule.DelayedCommand.after(0, target) - sched.add(cmd) - sched.run_pending() - callback.assert_called_once_with(target) - - def test_periodic_command(self): - sched = schedule.InvokeScheduler() - target = mock.MagicMock() - - before = datetime.datetime.utcnow() - - cmd = schedule.PeriodicCommand.after(10, target) - sched.add(cmd) - sched.run_pending() - target.assert_not_called() - - with freezegun.freeze_time(before + datetime.timedelta(seconds=15)): - sched.run_pending() - assert sched.queue - target.assert_called_once() - - with freezegun.freeze_time(before + datetime.timedelta(seconds=25)): - sched.run_pending() - assert target.call_count == 2 diff --git a/lib/tempora/tests/test_timing.py b/lib/tempora/tests/test_timing.py deleted file mode 100644 index 43bf7efc..00000000 --- a/lib/tempora/tests/test_timing.py +++ /dev/null @@ -1,50 +0,0 @@ -import datetime -import time -import contextlib -import os -from unittest import mock - -import pytest -from tempora import timing - - -def test_IntervalGovernor(): - """ - IntervalGovernor should prevent a function from being called more than - once per interval. - """ - func_under_test = mock.MagicMock() - # to look like a function, it needs a __name__ attribute - func_under_test.__name__ = 'func_under_test' - interval = datetime.timedelta(seconds=1) - governed = timing.IntervalGovernor(interval)(func_under_test) - governed('a') - governed('b') - governed(3, 'sir') - func_under_test.assert_called_once_with('a') - - -@pytest.fixture -def alt_tz(monkeypatch): - hasattr(time, 'tzset') or pytest.skip("tzset not available") - - @contextlib.contextmanager - def change(): - val = 'AEST-10AEDT-11,M10.5.0,M3.5.0' - with monkeypatch.context() as ctx: - ctx.setitem(os.environ, 'TZ', val) - time.tzset() - yield - time.tzset() - - return change() - - -def test_Stopwatch_timezone_change(alt_tz): - """ - The stopwatch should provide a consistent duration even - if the timezone changes. - """ - watch = timing.Stopwatch() - with alt_tz: - assert abs(watch.split().total_seconds()) < 0.1 diff --git a/requirements.txt b/requirements.txt index b88c9efe..73c9308b 100644 --- a/requirements.txt +++ b/requirements.txt @@ -42,7 +42,7 @@ rumps==0.4.0; platform_system == "Darwin" simplejson==3.18.0 six==1.16.0 soupsieve==2.3.2.post1 -tempora==5.1.0 +tempora==5.2.1 tokenize-rt==5.0.0 tzdata==2022.7 tzlocal==4.2