Bump tempora from 5.1.0 to 5.2.1 (#1977)

* Bump tempora from 5.1.0 to 5.2.1

Bumps [tempora](https://github.com/jaraco/tempora) from 5.1.0 to 5.2.1.
- [Release notes](https://github.com/jaraco/tempora/releases)
- [Changelog](https://github.com/jaraco/tempora/blob/main/CHANGES.rst)
- [Commits](https://github.com/jaraco/tempora/compare/v5.1.0...v5.2.1)

---
updated-dependencies:
- dependency-name: tempora
  dependency-type: direct:production
  update-type: version-update:semver-minor
...

Signed-off-by: dependabot[bot] <support@github.com>

* Update tempora==5.2.1

---------

Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: JonnyWong16 <9099342+JonnyWong16@users.noreply.github.com>

[skip ci]
This commit is contained in:
dependabot[bot] 2023-03-02 20:54:54 -08:00 committed by GitHub
parent 2fda916331
commit 6b1b6d0f32
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 367 additions and 403 deletions

View file

@ -4,6 +4,7 @@ import inspect
import collections
import types
import itertools
import warnings
import more_itertools
@ -266,11 +267,33 @@ def result_invoke(action):
return wrap
def call_aside(f, *args, **kwargs):
def invoke(f, *args, **kwargs):
"""
Call a function for its side effect after initialization.
>>> @call_aside
The benefit of using the decorator instead of simply invoking a function
after defining it is that it makes explicit the author's intent for the
function to be called immediately. Whereas if one simply calls the
function immediately, it's less obvious if that was intentional or
incidental. It also avoids repeating the name - the two actions, defining
the function and calling it immediately are modeled separately, but linked
by the decorator construct.
The benefit of having a function construct (opposed to just invoking some
behavior inline) is to serve as a scope in which the behavior occurs. It
avoids polluting the global namespace with local variables, provides an
anchor on which to attach documentation (docstring), keeps the behavior
logically separated (instead of conceptually separated or not separated at
all), and provides potential to re-use the behavior for testing or other
purposes.
This function is named as a pithy way to communicate, "call this function
primarily for its side effect", or "while defining this function, also
take it aside and call it". It exists because there's no Python construct
for "define and call" (nor should there be, as decorators serve this need
just fine). The behavior happens immediately and synchronously.
>>> @invoke
... def func(): print("called")
called
>>> func()
@ -278,7 +301,7 @@ def call_aside(f, *args, **kwargs):
Use functools.partial to pass parameters to the initial call
>>> @functools.partial(call_aside, name='bingo')
>>> @functools.partial(invoke, name='bingo')
... def func(name): print("called with", name)
called with bingo
"""
@ -286,6 +309,14 @@ def call_aside(f, *args, **kwargs):
return f
def call_aside(*args, **kwargs):
"""
Deprecated name for invoke.
"""
warnings.warn("call_aside is deprecated, use invoke", DeprecationWarning)
return invoke(*args, **kwargs)
class Throttler:
"""
Rate-limit a function (or other callable)

View file

@ -3,4 +3,4 @@
from .more import * # noqa
from .recipes import * # noqa
__version__ = '9.0.0'
__version__ = '9.1.0'

View file

@ -68,6 +68,7 @@ __all__ = [
'exactly_n',
'filter_except',
'first',
'gray_product',
'groupby_transform',
'ichunked',
'iequals',
@ -658,6 +659,7 @@ def distinct_permutations(iterable, r=None):
[(0, 1), (0, 2), (1, 0), (1, 2), (2, 0), (2, 1)]
"""
# Algorithm: https://w.wiki/Qai
def _full(A):
while True:
@ -1301,7 +1303,7 @@ def split_at(iterable, pred, maxsplit=-1, keep_separator=False):
[[0], [2], [4, 5, 6, 7, 8, 9]]
By default, the delimiting items are not included in the output.
The include them, set *keep_separator* to ``True``.
To include them, set *keep_separator* to ``True``.
>>> list(split_at('abcdcba', lambda x: x == 'b', keep_separator=True))
[['a'], ['b'], ['c', 'd', 'c'], ['b'], ['a']]
@ -1391,7 +1393,9 @@ def split_after(iterable, pred, maxsplit=-1):
if pred(item) and buf:
yield buf
if maxsplit == 1:
yield list(it)
buf = list(it)
if buf:
yield buf
return
buf = []
maxsplit -= 1
@ -2914,6 +2918,7 @@ def make_decorator(wrapping_func, result_index=0):
'7'
"""
# See https://sites.google.com/site/bbayles/index/decorator_factory for
# notes on how this works.
def decorator(*wrapping_args, **wrapping_kwargs):
@ -3464,7 +3469,6 @@ def _sample_unweighted(iterable, k):
next_index = k + floor(log(random()) / log(1 - W))
for index, element in enumerate(iterable, k):
if index == next_index:
reservoir[randrange(k)] = element
# The new W is the largest in a sample of k U(0, `old_W`) numbers
@ -4284,7 +4288,6 @@ def minmax(iterable_or_value, *others, key=None, default=_marker):
lo_key = hi_key = key(lo)
for x, y in zip_longest(it, it, fillvalue=lo):
x_key, y_key = key(x), key(y)
if y_key < x_key:
@ -4345,3 +4348,45 @@ def constrained_batches(
if batch:
yield tuple(batch)
def gray_product(*iterables):
"""Like :func:`itertools.product`, but return tuples in an order such
that only one element in the generated tuple changes from one iteration
to the next.
>>> list(gray_product('AB','CD'))
[('A', 'C'), ('B', 'C'), ('B', 'D'), ('A', 'D')]
This function consumes all of the input iterables before producing output.
If any of the input iterables have fewer than two items, ``ValueError``
is raised.
For information on the algorithm, see
`this section <https://www-cs-faculty.stanford.edu/~knuth/fasc2a.ps.gz>`__
of Donald Knuth's *The Art of Computer Programming*.
"""
all_iterables = tuple(tuple(x) for x in iterables)
iterable_count = len(all_iterables)
for iterable in all_iterables:
if len(iterable) < 2:
raise ValueError("each iterable must have two or more items")
# This is based on "Algorithm H" from section 7.2.1.1, page 20.
# a holds the indexes of the source iterables for the n-tuple to be yielded
# f is the array of "focus pointers"
# o is the array of "directions"
a = [0] * iterable_count
f = list(range(iterable_count + 1))
o = [1] * iterable_count
while True:
yield tuple(all_iterables[i][a[i]] for i in range(iterable_count))
j = f[0]
f[0] = 0
if j == iterable_count:
break
a[j] = a[j] + o[j]
if a[j] == 0 or a[j] == len(all_iterables[j]) - 1:
o[j] = -o[j]
f[j] = f[j + 1]
f[j + 1] = j + 1

View file

@ -1,26 +1,25 @@
"""Stubs for more_itertools.more"""
from __future__ import annotations
from types import TracebackType
from typing import (
Any,
Callable,
Container,
Dict,
ContextManager,
Generic,
Hashable,
Iterable,
Iterator,
List,
Optional,
overload,
Reversible,
Sequence,
Sized,
Tuple,
Union,
Type,
TypeVar,
type_check_only,
)
from types import TracebackType
from typing_extensions import ContextManager, Protocol, Type, overload
from typing_extensions import Protocol
# Type and type variable definitions
_T = TypeVar('_T')
@ -31,7 +30,7 @@ _V = TypeVar('_V')
_W = TypeVar('_W')
_T_co = TypeVar('_T_co', covariant=True)
_GenFn = TypeVar('_GenFn', bound=Callable[..., Iterator[object]])
_Raisable = Union[BaseException, 'Type[BaseException]']
_Raisable = BaseException | Type[BaseException]
@type_check_only
class _SizedIterable(Protocol[_T_co], Sized, Iterable[_T_co]): ...
@ -39,23 +38,25 @@ class _SizedIterable(Protocol[_T_co], Sized, Iterable[_T_co]): ...
@type_check_only
class _SizedReversible(Protocol[_T_co], Sized, Reversible[_T_co]): ...
@type_check_only
class _SupportsSlicing(Protocol[_T_co]):
def __getitem__(self, __k: slice) -> _T_co: ...
def chunked(
iterable: Iterable[_T], n: Optional[int], strict: bool = ...
) -> Iterator[List[_T]]: ...
iterable: Iterable[_T], n: int | None, strict: bool = ...
) -> Iterator[list[_T]]: ...
@overload
def first(iterable: Iterable[_T]) -> _T: ...
@overload
def first(iterable: Iterable[_T], default: _U) -> Union[_T, _U]: ...
def first(iterable: Iterable[_T], default: _U) -> _T | _U: ...
@overload
def last(iterable: Iterable[_T]) -> _T: ...
@overload
def last(iterable: Iterable[_T], default: _U) -> Union[_T, _U]: ...
def last(iterable: Iterable[_T], default: _U) -> _T | _U: ...
@overload
def nth_or_last(iterable: Iterable[_T], n: int) -> _T: ...
@overload
def nth_or_last(
iterable: Iterable[_T], n: int, default: _U
) -> Union[_T, _U]: ...
def nth_or_last(iterable: Iterable[_T], n: int, default: _U) -> _T | _U: ...
class peekable(Generic[_T], Iterator[_T]):
def __init__(self, iterable: Iterable[_T]) -> None: ...
@ -64,13 +65,13 @@ class peekable(Generic[_T], Iterator[_T]):
@overload
def peek(self) -> _T: ...
@overload
def peek(self, default: _U) -> Union[_T, _U]: ...
def peek(self, default: _U) -> _T | _U: ...
def prepend(self, *items: _T) -> None: ...
def __next__(self) -> _T: ...
@overload
def __getitem__(self, index: int) -> _T: ...
@overload
def __getitem__(self, index: slice) -> List[_T]: ...
def __getitem__(self, index: slice) -> list[_T]: ...
def consumer(func: _GenFn) -> _GenFn: ...
def ilen(iterable: Iterable[object]) -> int: ...
@ -80,42 +81,42 @@ def with_iter(
) -> Iterator[_T]: ...
def one(
iterable: Iterable[_T],
too_short: Optional[_Raisable] = ...,
too_long: Optional[_Raisable] = ...,
too_short: _Raisable | None = ...,
too_long: _Raisable | None = ...,
) -> _T: ...
def raise_(exception: _Raisable, *args: Any) -> None: ...
def strictly_n(
iterable: Iterable[_T],
n: int,
too_short: Optional[_GenFn] = ...,
too_long: Optional[_GenFn] = ...,
) -> List[_T]: ...
too_short: _GenFn | None = ...,
too_long: _GenFn | None = ...,
) -> list[_T]: ...
def distinct_permutations(
iterable: Iterable[_T], r: Optional[int] = ...
) -> Iterator[Tuple[_T, ...]]: ...
iterable: Iterable[_T], r: int | None = ...
) -> Iterator[tuple[_T, ...]]: ...
def intersperse(
e: _U, iterable: Iterable[_T], n: int = ...
) -> Iterator[Union[_T, _U]]: ...
def unique_to_each(*iterables: Iterable[_T]) -> List[List[_T]]: ...
) -> Iterator[_T | _U]: ...
def unique_to_each(*iterables: Iterable[_T]) -> list[list[_T]]: ...
@overload
def windowed(
seq: Iterable[_T], n: int, *, step: int = ...
) -> Iterator[Tuple[Optional[_T], ...]]: ...
) -> Iterator[tuple[_T | None, ...]]: ...
@overload
def windowed(
seq: Iterable[_T], n: int, fillvalue: _U, step: int = ...
) -> Iterator[Tuple[Union[_T, _U], ...]]: ...
def substrings(iterable: Iterable[_T]) -> Iterator[Tuple[_T, ...]]: ...
) -> Iterator[tuple[_T | _U, ...]]: ...
def substrings(iterable: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ...
def substrings_indexes(
seq: Sequence[_T], reverse: bool = ...
) -> Iterator[Tuple[Sequence[_T], int, int]]: ...
) -> Iterator[tuple[Sequence[_T], int, int]]: ...
class bucket(Generic[_T, _U], Container[_U]):
def __init__(
self,
iterable: Iterable[_T],
key: Callable[[_T], _U],
validator: Optional[Callable[[object], object]] = ...,
validator: Callable[[object], object] | None = ...,
) -> None: ...
def __contains__(self, value: object) -> bool: ...
def __iter__(self) -> Iterator[_U]: ...
@ -123,109 +124,105 @@ class bucket(Generic[_T, _U], Container[_U]):
def spy(
iterable: Iterable[_T], n: int = ...
) -> Tuple[List[_T], Iterator[_T]]: ...
) -> tuple[list[_T], Iterator[_T]]: ...
def interleave(*iterables: Iterable[_T]) -> Iterator[_T]: ...
def interleave_longest(*iterables: Iterable[_T]) -> Iterator[_T]: ...
def interleave_evenly(
iterables: List[Iterable[_T]], lengths: Optional[List[int]] = ...
iterables: list[Iterable[_T]], lengths: list[int] | None = ...
) -> Iterator[_T]: ...
def collapse(
iterable: Iterable[Any],
base_type: Optional[type] = ...,
levels: Optional[int] = ...,
base_type: type | None = ...,
levels: int | None = ...,
) -> Iterator[Any]: ...
@overload
def side_effect(
func: Callable[[_T], object],
iterable: Iterable[_T],
chunk_size: None = ...,
before: Optional[Callable[[], object]] = ...,
after: Optional[Callable[[], object]] = ...,
before: Callable[[], object] | None = ...,
after: Callable[[], object] | None = ...,
) -> Iterator[_T]: ...
@overload
def side_effect(
func: Callable[[List[_T]], object],
func: Callable[[list[_T]], object],
iterable: Iterable[_T],
chunk_size: int,
before: Optional[Callable[[], object]] = ...,
after: Optional[Callable[[], object]] = ...,
before: Callable[[], object] | None = ...,
after: Callable[[], object] | None = ...,
) -> Iterator[_T]: ...
def sliced(
seq: Sequence[_T], n: int, strict: bool = ...
) -> Iterator[Sequence[_T]]: ...
seq: _SupportsSlicing[_T], n: int, strict: bool = ...
) -> Iterator[_T]: ...
def split_at(
iterable: Iterable[_T],
pred: Callable[[_T], object],
maxsplit: int = ...,
keep_separator: bool = ...,
) -> Iterator[List[_T]]: ...
) -> Iterator[list[_T]]: ...
def split_before(
iterable: Iterable[_T], pred: Callable[[_T], object], maxsplit: int = ...
) -> Iterator[List[_T]]: ...
) -> Iterator[list[_T]]: ...
def split_after(
iterable: Iterable[_T], pred: Callable[[_T], object], maxsplit: int = ...
) -> Iterator[List[_T]]: ...
) -> Iterator[list[_T]]: ...
def split_when(
iterable: Iterable[_T],
pred: Callable[[_T, _T], object],
maxsplit: int = ...,
) -> Iterator[List[_T]]: ...
) -> Iterator[list[_T]]: ...
def split_into(
iterable: Iterable[_T], sizes: Iterable[Optional[int]]
) -> Iterator[List[_T]]: ...
iterable: Iterable[_T], sizes: Iterable[int | None]
) -> Iterator[list[_T]]: ...
@overload
def padded(
iterable: Iterable[_T],
*,
n: Optional[int] = ...,
n: int | None = ...,
next_multiple: bool = ...,
) -> Iterator[Optional[_T]]: ...
) -> Iterator[_T | None]: ...
@overload
def padded(
iterable: Iterable[_T],
fillvalue: _U,
n: Optional[int] = ...,
n: int | None = ...,
next_multiple: bool = ...,
) -> Iterator[Union[_T, _U]]: ...
) -> Iterator[_T | _U]: ...
@overload
def repeat_last(iterable: Iterable[_T]) -> Iterator[_T]: ...
@overload
def repeat_last(
iterable: Iterable[_T], default: _U
) -> Iterator[Union[_T, _U]]: ...
def distribute(n: int, iterable: Iterable[_T]) -> List[Iterator[_T]]: ...
def repeat_last(iterable: Iterable[_T], default: _U) -> Iterator[_T | _U]: ...
def distribute(n: int, iterable: Iterable[_T]) -> list[Iterator[_T]]: ...
@overload
def stagger(
iterable: Iterable[_T],
offsets: _SizedIterable[int] = ...,
longest: bool = ...,
) -> Iterator[Tuple[Optional[_T], ...]]: ...
) -> Iterator[tuple[_T | None, ...]]: ...
@overload
def stagger(
iterable: Iterable[_T],
offsets: _SizedIterable[int] = ...,
longest: bool = ...,
fillvalue: _U = ...,
) -> Iterator[Tuple[Union[_T, _U], ...]]: ...
) -> Iterator[tuple[_T | _U, ...]]: ...
class UnequalIterablesError(ValueError):
def __init__(
self, details: Optional[Tuple[int, int, int]] = ...
) -> None: ...
def __init__(self, details: tuple[int, int, int] | None = ...) -> None: ...
@overload
def zip_equal(__iter1: Iterable[_T1]) -> Iterator[Tuple[_T1]]: ...
def zip_equal(__iter1: Iterable[_T1]) -> Iterator[tuple[_T1]]: ...
@overload
def zip_equal(
__iter1: Iterable[_T1], __iter2: Iterable[_T2]
) -> Iterator[Tuple[_T1, _T2]]: ...
) -> Iterator[tuple[_T1, _T2]]: ...
@overload
def zip_equal(
__iter1: Iterable[_T],
__iter2: Iterable[_T],
__iter3: Iterable[_T],
*iterables: Iterable[_T],
) -> Iterator[Tuple[_T, ...]]: ...
) -> Iterator[tuple[_T, ...]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T1],
@ -233,7 +230,7 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: None = None,
) -> Iterator[Tuple[Optional[_T1]]]: ...
) -> Iterator[tuple[_T1 | None]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T1],
@ -242,7 +239,7 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: None = None,
) -> Iterator[Tuple[Optional[_T1], Optional[_T2]]]: ...
) -> Iterator[tuple[_T1 | None, _T2 | None]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T],
@ -252,7 +249,7 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: None = None,
) -> Iterator[Tuple[Optional[_T], ...]]: ...
) -> Iterator[tuple[_T | None, ...]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T1],
@ -260,7 +257,7 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: _U,
) -> Iterator[Tuple[Union[_T1, _U]]]: ...
) -> Iterator[tuple[_T1 | _U]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T1],
@ -269,7 +266,7 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: _U,
) -> Iterator[Tuple[Union[_T1, _U], Union[_T2, _U]]]: ...
) -> Iterator[tuple[_T1 | _U, _T2 | _U]]: ...
@overload
def zip_offset(
__iter1: Iterable[_T],
@ -279,82 +276,80 @@ def zip_offset(
offsets: _SizedIterable[int],
longest: bool = ...,
fillvalue: _U,
) -> Iterator[Tuple[Union[_T, _U], ...]]: ...
) -> Iterator[tuple[_T | _U, ...]]: ...
def sort_together(
iterables: Iterable[Iterable[_T]],
key_list: Iterable[int] = ...,
key: Optional[Callable[..., Any]] = ...,
key: Callable[..., Any] | None = ...,
reverse: bool = ...,
) -> List[Tuple[_T, ...]]: ...
def unzip(iterable: Iterable[Sequence[_T]]) -> Tuple[Iterator[_T], ...]: ...
def divide(n: int, iterable: Iterable[_T]) -> List[Iterator[_T]]: ...
) -> list[tuple[_T, ...]]: ...
def unzip(iterable: Iterable[Sequence[_T]]) -> tuple[Iterator[_T], ...]: ...
def divide(n: int, iterable: Iterable[_T]) -> list[Iterator[_T]]: ...
def always_iterable(
obj: object,
base_type: Union[
type, Tuple[Union[type, Tuple[Any, ...]], ...], None
] = ...,
base_type: type | tuple[type | tuple[Any, ...], ...] | None = ...,
) -> Iterator[Any]: ...
def adjacent(
predicate: Callable[[_T], bool],
iterable: Iterable[_T],
distance: int = ...,
) -> Iterator[Tuple[bool, _T]]: ...
) -> Iterator[tuple[bool, _T]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None = None,
valuefunc: None = None,
reducefunc: None = None,
) -> Iterator[Tuple[_T, Iterator[_T]]]: ...
) -> Iterator[tuple[_T, Iterator[_T]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: None,
reducefunc: None,
) -> Iterator[Tuple[_U, Iterator[_T]]]: ...
) -> Iterator[tuple[_U, Iterator[_T]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: Callable[[_T], _V],
reducefunc: None,
) -> Iterable[Tuple[_T, Iterable[_V]]]: ...
) -> Iterable[tuple[_T, Iterable[_V]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: None,
) -> Iterable[Tuple[_U, Iterator[_V]]]: ...
) -> Iterable[tuple[_U, Iterator[_V]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: None,
reducefunc: Callable[[Iterator[_T]], _W],
) -> Iterable[Tuple[_T, _W]]: ...
) -> Iterable[tuple[_T, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: None,
reducefunc: Callable[[Iterator[_T]], _W],
) -> Iterable[Tuple[_U, _W]]: ...
) -> Iterable[tuple[_U, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: Callable[[_T], _V],
reducefunc: Callable[[Iterable[_V]], _W],
) -> Iterable[Tuple[_T, _W]]: ...
) -> Iterable[tuple[_T, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: Callable[[Iterable[_V]], _W],
) -> Iterable[Tuple[_U, _W]]: ...
) -> Iterable[tuple[_U, _W]]: ...
class numeric_range(Generic[_T, _U], Sequence[_T], Hashable, Reversible[_T]):
@overload
@ -375,22 +370,22 @@ class numeric_range(Generic[_T, _U], Sequence[_T], Hashable, Reversible[_T]):
def __len__(self) -> int: ...
def __reduce__(
self,
) -> Tuple[Type[numeric_range[_T, _U]], Tuple[_T, _T, _U]]: ...
) -> tuple[Type[numeric_range[_T, _U]], tuple[_T, _T, _U]]: ...
def __repr__(self) -> str: ...
def __reversed__(self) -> Iterator[_T]: ...
def count(self, value: _T) -> int: ...
def index(self, value: _T) -> int: ... # type: ignore
def count_cycle(
iterable: Iterable[_T], n: Optional[int] = ...
) -> Iterable[Tuple[int, _T]]: ...
iterable: Iterable[_T], n: int | None = ...
) -> Iterable[tuple[int, _T]]: ...
def mark_ends(
iterable: Iterable[_T],
) -> Iterable[Tuple[bool, bool, _T]]: ...
) -> Iterable[tuple[bool, bool, _T]]: ...
def locate(
iterable: Iterable[object],
pred: Callable[..., Any] = ...,
window_size: Optional[int] = ...,
window_size: int | None = ...,
) -> Iterator[int]: ...
def lstrip(
iterable: Iterable[_T], pred: Callable[[_T], object]
@ -403,9 +398,7 @@ def strip(
) -> Iterator[_T]: ...
class islice_extended(Generic[_T], Iterator[_T]):
def __init__(
self, iterable: Iterable[_T], *args: Optional[int]
) -> None: ...
def __init__(self, iterable: Iterable[_T], *args: int | None) -> None: ...
def __iter__(self) -> islice_extended[_T]: ...
def __next__(self) -> _T: ...
def __getitem__(self, index: slice) -> islice_extended[_T]: ...
@ -420,7 +413,7 @@ def difference(
func: Callable[[_T, _T], _U] = ...,
*,
initial: None = ...,
) -> Iterator[Union[_T, _U]]: ...
) -> Iterator[_T | _U]: ...
@overload
def difference(
iterable: Iterable[_T], func: Callable[[_T, _T], _U] = ..., *, initial: _U
@ -436,7 +429,7 @@ class SequenceView(Generic[_T], Sequence[_T]):
class seekable(Generic[_T], Iterator[_T]):
def __init__(
self, iterable: Iterable[_T], maxlen: Optional[int] = ...
self, iterable: Iterable[_T], maxlen: int | None = ...
) -> None: ...
def __iter__(self) -> seekable[_T]: ...
def __next__(self) -> _T: ...
@ -444,20 +437,20 @@ class seekable(Generic[_T], Iterator[_T]):
@overload
def peek(self) -> _T: ...
@overload
def peek(self, default: _U) -> Union[_T, _U]: ...
def peek(self, default: _U) -> _T | _U: ...
def elements(self) -> SequenceView[_T]: ...
def seek(self, index: int) -> None: ...
class run_length:
@staticmethod
def encode(iterable: Iterable[_T]) -> Iterator[Tuple[_T, int]]: ...
def encode(iterable: Iterable[_T]) -> Iterator[tuple[_T, int]]: ...
@staticmethod
def decode(iterable: Iterable[Tuple[_T, int]]) -> Iterator[_T]: ...
def decode(iterable: Iterable[tuple[_T, int]]) -> Iterator[_T]: ...
def exactly_n(
iterable: Iterable[_T], n: int, predicate: Callable[[_T], object] = ...
) -> bool: ...
def circular_shifts(iterable: Iterable[_T]) -> List[Tuple[_T, ...]]: ...
def circular_shifts(iterable: Iterable[_T]) -> list[tuple[_T, ...]]: ...
def make_decorator(
wrapping_func: Callable[..., _U], result_index: int = ...
) -> Callable[..., Callable[[Callable[..., Any]], Callable[..., _U]]]: ...
@ -467,44 +460,44 @@ def map_reduce(
keyfunc: Callable[[_T], _U],
valuefunc: None = ...,
reducefunc: None = ...,
) -> Dict[_U, List[_T]]: ...
) -> dict[_U, list[_T]]: ...
@overload
def map_reduce(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: None = ...,
) -> Dict[_U, List[_V]]: ...
) -> dict[_U, list[_V]]: ...
@overload
def map_reduce(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: None = ...,
reducefunc: Callable[[List[_T]], _W] = ...,
) -> Dict[_U, _W]: ...
reducefunc: Callable[[list[_T]], _W] = ...,
) -> dict[_U, _W]: ...
@overload
def map_reduce(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: Callable[[List[_V]], _W],
) -> Dict[_U, _W]: ...
reducefunc: Callable[[list[_V]], _W],
) -> dict[_U, _W]: ...
def rlocate(
iterable: Iterable[_T],
pred: Callable[..., object] = ...,
window_size: Optional[int] = ...,
window_size: int | None = ...,
) -> Iterator[int]: ...
def replace(
iterable: Iterable[_T],
pred: Callable[..., object],
substitutes: Iterable[_U],
count: Optional[int] = ...,
count: int | None = ...,
window_size: int = ...,
) -> Iterator[Union[_T, _U]]: ...
def partitions(iterable: Iterable[_T]) -> Iterator[List[List[_T]]]: ...
) -> Iterator[_T | _U]: ...
def partitions(iterable: Iterable[_T]) -> Iterator[list[list[_T]]]: ...
def set_partitions(
iterable: Iterable[_T], k: Optional[int] = ...
) -> Iterator[List[List[_T]]]: ...
iterable: Iterable[_T], k: int | None = ...
) -> Iterator[list[list[_T]]]: ...
class time_limited(Generic[_T], Iterator[_T]):
def __init__(
@ -515,16 +508,16 @@ class time_limited(Generic[_T], Iterator[_T]):
@overload
def only(
iterable: Iterable[_T], *, too_long: Optional[_Raisable] = ...
) -> Optional[_T]: ...
iterable: Iterable[_T], *, too_long: _Raisable | None = ...
) -> _T | None: ...
@overload
def only(
iterable: Iterable[_T], default: _U, too_long: Optional[_Raisable] = ...
) -> Union[_T, _U]: ...
iterable: Iterable[_T], default: _U, too_long: _Raisable | None = ...
) -> _T | _U: ...
def ichunked(iterable: Iterable[_T], n: int) -> Iterator[Iterator[_T]]: ...
def distinct_combinations(
iterable: Iterable[_T], r: int
) -> Iterator[Tuple[_T, ...]]: ...
) -> Iterator[tuple[_T, ...]]: ...
def filter_except(
validator: Callable[[Any], object],
iterable: Iterable[_T],
@ -539,16 +532,16 @@ def map_if(
iterable: Iterable[Any],
pred: Callable[[Any], bool],
func: Callable[[Any], Any],
func_else: Optional[Callable[[Any], Any]] = ...,
func_else: Callable[[Any], Any] | None = ...,
) -> Iterator[Any]: ...
def sample(
iterable: Iterable[_T],
k: int,
weights: Optional[Iterable[float]] = ...,
) -> List[_T]: ...
weights: Iterable[float] | None = ...,
) -> list[_T]: ...
def is_sorted(
iterable: Iterable[_T],
key: Optional[Callable[[_T], _U]] = ...,
key: Callable[[_T], _U] | None = ...,
reverse: bool = False,
strict: bool = False,
) -> bool: ...
@ -566,10 +559,10 @@ class callback_iter(Generic[_T], Iterator[_T]):
def __enter__(self) -> callback_iter[_T]: ...
def __exit__(
self,
exc_type: Optional[Type[BaseException]],
exc_value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> Optional[bool]: ...
exc_type: Type[BaseException] | None,
exc_value: BaseException | None,
traceback: TracebackType | None,
) -> bool | None: ...
def __iter__(self) -> callback_iter[_T]: ...
def __next__(self) -> _T: ...
def _reader(self) -> Iterator[_T]: ...
@ -580,15 +573,15 @@ class callback_iter(Generic[_T], Iterator[_T]):
def windowed_complete(
iterable: Iterable[_T], n: int
) -> Iterator[Tuple[_T, ...]]: ...
) -> Iterator[tuple[_T, ...]]: ...
def all_unique(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
iterable: Iterable[_T], key: Callable[[_T], _U] | None = ...
) -> bool: ...
def nth_product(index: int, *args: Iterable[_T]) -> Tuple[_T, ...]: ...
def nth_product(index: int, *args: Iterable[_T]) -> tuple[_T, ...]: ...
def nth_permutation(
iterable: Iterable[_T], r: int, index: int
) -> Tuple[_T, ...]: ...
def value_chain(*args: Union[_T, Iterable[_T]]) -> Iterable[_T]: ...
) -> tuple[_T, ...]: ...
def value_chain(*args: _T | Iterable[_T]) -> Iterable[_T]: ...
def product_index(element: Iterable[_T], *args: Iterable[_T]) -> int: ...
def combination_index(
element: Iterable[_T], iterable: Iterable[_T]
@ -603,22 +596,20 @@ class countable(Generic[_T], Iterator[_T]):
def __iter__(self) -> countable[_T]: ...
def __next__(self) -> _T: ...
def chunked_even(iterable: Iterable[_T], n: int) -> Iterator[List[_T]]: ...
def chunked_even(iterable: Iterable[_T], n: int) -> Iterator[list[_T]]: ...
def zip_broadcast(
*objects: Union[_T, Iterable[_T]],
scalar_types: Union[
type, Tuple[Union[type, Tuple[Any, ...]], ...], None
] = ...,
*objects: _T | Iterable[_T],
scalar_types: type | tuple[type | tuple[Any, ...], ...] | None = ...,
strict: bool = ...,
) -> Iterable[Tuple[_T, ...]]: ...
) -> Iterable[tuple[_T, ...]]: ...
def unique_in_window(
iterable: Iterable[_T], n: int, key: Optional[Callable[[_T], _U]] = ...
iterable: Iterable[_T], n: int, key: Callable[[_T], _U] | None = ...
) -> Iterator[_T]: ...
def duplicates_everseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
iterable: Iterable[_T], key: Callable[[_T], _U] | None = ...
) -> Iterator[_T]: ...
def duplicates_justseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
iterable: Iterable[_T], key: Callable[[_T], _U] | None = ...
) -> Iterator[_T]: ...
class _SupportsLessThan(Protocol):
@ -629,38 +620,38 @@ _SupportsLessThanT = TypeVar("_SupportsLessThanT", bound=_SupportsLessThan)
@overload
def minmax(
iterable_or_value: Iterable[_SupportsLessThanT], *, key: None = None
) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
) -> tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
@overload
def minmax(
iterable_or_value: Iterable[_T], *, key: Callable[[_T], _SupportsLessThan]
) -> Tuple[_T, _T]: ...
) -> tuple[_T, _T]: ...
@overload
def minmax(
iterable_or_value: Iterable[_SupportsLessThanT],
*,
key: None = None,
default: _U,
) -> Union[_U, Tuple[_SupportsLessThanT, _SupportsLessThanT]]: ...
) -> _U | tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
@overload
def minmax(
iterable_or_value: Iterable[_T],
*,
key: Callable[[_T], _SupportsLessThan],
default: _U,
) -> Union[_U, Tuple[_T, _T]]: ...
) -> _U | tuple[_T, _T]: ...
@overload
def minmax(
iterable_or_value: _SupportsLessThanT,
__other: _SupportsLessThanT,
*others: _SupportsLessThanT,
) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
) -> tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
@overload
def minmax(
iterable_or_value: _T,
__other: _T,
*others: _T,
key: Callable[[_T], _SupportsLessThan],
) -> Tuple[_T, _T]: ...
) -> tuple[_T, _T]: ...
def longest_common_prefix(
iterables: Iterable[Iterable[_T]],
) -> Iterator[_T]: ...
@ -668,7 +659,8 @@ def iequals(*iterables: Iterable[object]) -> bool: ...
def constrained_batches(
iterable: Iterable[object],
max_size: int,
max_count: Optional[int] = ...,
max_count: int | None = ...,
get_len: Callable[[_T], object] = ...,
strict: bool = ...,
) -> Iterator[Tuple[_T]]: ...
) -> Iterator[tuple[_T]]: ...
def gray_product(*iterables: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ...

View file

@ -9,6 +9,7 @@ Some backward-compatible usability improvements have been made.
"""
import math
import operator
import warnings
from collections import deque
from collections.abc import Sized
@ -21,12 +22,14 @@ from itertools import (
cycle,
groupby,
islice,
product,
repeat,
starmap,
tee,
zip_longest,
)
from random import randrange, sample, choice
from sys import hexversion
__all__ = [
'all_equal',
@ -36,9 +39,12 @@ __all__ = [
'convolve',
'dotproduct',
'first_true',
'factor',
'flatten',
'grouper',
'iter_except',
'iter_index',
'matmul',
'ncycles',
'nth',
'nth_combination',
@ -62,6 +68,7 @@ __all__ = [
'tabulate',
'tail',
'take',
'transpose',
'triplewise',
'unique_everseen',
'unique_justseen',
@ -808,6 +815,35 @@ def polynomial_from_roots(roots):
]
def iter_index(iterable, value, start=0):
"""Yield the index of each place in *iterable* that *value* occurs,
beginning with index *start*.
See :func:`locate` for a more general means of finding the indexes
associated with particular values.
>>> list(iter_index('AABCADEAF', 'A'))
[0, 1, 4, 7]
"""
try:
seq_index = iterable.index
except AttributeError:
# Slow path for general iterables
it = islice(iterable, start, None)
for i, element in enumerate(it, start):
if element is value or element == value:
yield i
else:
# Fast path for sequences
i = start - 1
try:
while True:
i = seq_index(value, i + 1)
yield i
except ValueError:
pass
def sieve(n):
"""Yield the primes less than n.
@ -815,13 +851,13 @@ def sieve(n):
[2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
"""
isqrt = getattr(math, 'isqrt', lambda x: int(math.sqrt(x)))
data = bytearray((0, 1)) * (n // 2)
data[:3] = 0, 0, 0
limit = isqrt(n) + 1
data = bytearray([1]) * n
data[:2] = 0, 0
for p in compress(range(limit), data):
data[p + p : n : p] = bytearray(len(range(p + p, n, p)))
return compress(count(), data)
data[p * p : n : p + p] = bytes(len(range(p * p, n, p + p)))
data[2] = 1
return iter_index(data, 1) if n > 2 else iter([])
def batched(iterable, n):
@ -833,9 +869,62 @@ def batched(iterable, n):
This recipe is from the ``itertools`` docs. This library also provides
:func:`chunked`, which has a different implementation.
"""
if hexversion >= 0x30C00A0: # Python 3.12.0a0
warnings.warn(
(
'batched will be removed in a future version of '
'more-itertools. Use the standard library '
'itertools.batched function instead'
),
DeprecationWarning,
)
it = iter(iterable)
while True:
batch = list(islice(it, n))
if not batch:
break
yield batch
def transpose(it):
"""Swap the rows and columns of the input.
>>> list(transpose([(1, 2, 3), (11, 22, 33)]))
[(1, 11), (2, 22), (3, 33)]
The caller should ensure that the dimensions of the input are compatible.
"""
# TODO: when 3.9 goes end-of-life, add stric=True to this.
return zip(*it)
def matmul(m1, m2):
"""Multiply two matrices.
>>> list(matmul([(7, 5), (3, 5)], [(2, 5), (7, 9)]))
[[49, 80], [41, 60]]
The caller should ensure that the dimensions of the input matrices are
compatible with each other.
"""
n = len(m2[0])
return batched(starmap(dotproduct, product(m1, transpose(m2))), n)
def factor(n):
"""Yield the prime factors of n.
>>> list(factor(360))
[2, 2, 2, 3, 3, 5]
"""
isqrt = getattr(math, 'isqrt', lambda x: int(math.sqrt(x)))
for prime in sieve(isqrt(n) + 1):
while True:
quotient, remainder = divmod(n, prime)
if remainder:
break
yield prime
n = quotient
if n == 1:
return
if n >= 2:
yield n

View file

@ -1,110 +1,119 @@
"""Stubs for more_itertools.recipes"""
from __future__ import annotations
from typing import (
Any,
Callable,
Iterable,
Iterator,
List,
Optional,
overload,
Sequence,
Tuple,
Type,
TypeVar,
Union,
)
from typing_extensions import overload, Type
# Type and type variable definitions
_T = TypeVar('_T')
_U = TypeVar('_U')
def take(n: int, iterable: Iterable[_T]) -> List[_T]: ...
def take(n: int, iterable: Iterable[_T]) -> list[_T]: ...
def tabulate(
function: Callable[[int], _T], start: int = ...
) -> Iterator[_T]: ...
def tail(n: int, iterable: Iterable[_T]) -> Iterator[_T]: ...
def consume(iterator: Iterable[object], n: Optional[int] = ...) -> None: ...
def consume(iterator: Iterable[object], n: int | None = ...) -> None: ...
@overload
def nth(iterable: Iterable[_T], n: int) -> Optional[_T]: ...
def nth(iterable: Iterable[_T], n: int) -> _T | None: ...
@overload
def nth(iterable: Iterable[_T], n: int, default: _U) -> Union[_T, _U]: ...
def nth(iterable: Iterable[_T], n: int, default: _U) -> _T | _U: ...
def all_equal(iterable: Iterable[object]) -> bool: ...
def quantify(
iterable: Iterable[_T], pred: Callable[[_T], bool] = ...
) -> int: ...
def pad_none(iterable: Iterable[_T]) -> Iterator[Optional[_T]]: ...
def padnone(iterable: Iterable[_T]) -> Iterator[Optional[_T]]: ...
def pad_none(iterable: Iterable[_T]) -> Iterator[_T | None]: ...
def padnone(iterable: Iterable[_T]) -> Iterator[_T | None]: ...
def ncycles(iterable: Iterable[_T], n: int) -> Iterator[_T]: ...
def dotproduct(vec1: Iterable[object], vec2: Iterable[object]) -> object: ...
def flatten(listOfLists: Iterable[Iterable[_T]]) -> Iterator[_T]: ...
def repeatfunc(
func: Callable[..., _U], times: Optional[int] = ..., *args: Any
func: Callable[..., _U], times: int | None = ..., *args: Any
) -> Iterator[_U]: ...
def pairwise(iterable: Iterable[_T]) -> Iterator[Tuple[_T, _T]]: ...
def pairwise(iterable: Iterable[_T]) -> Iterator[tuple[_T, _T]]: ...
def grouper(
iterable: Iterable[_T],
n: int,
incomplete: str = ...,
fillvalue: _U = ...,
) -> Iterator[Tuple[Union[_T, _U], ...]]: ...
) -> Iterator[tuple[_T | _U, ...]]: ...
def roundrobin(*iterables: Iterable[_T]) -> Iterator[_T]: ...
def partition(
pred: Optional[Callable[[_T], object]], iterable: Iterable[_T]
) -> Tuple[Iterator[_T], Iterator[_T]]: ...
def powerset(iterable: Iterable[_T]) -> Iterator[Tuple[_T, ...]]: ...
pred: Callable[[_T], object] | None, iterable: Iterable[_T]
) -> tuple[Iterator[_T], Iterator[_T]]: ...
def powerset(iterable: Iterable[_T]) -> Iterator[tuple[_T, ...]]: ...
def unique_everseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
iterable: Iterable[_T], key: Callable[[_T], _U] | None = ...
) -> Iterator[_T]: ...
def unique_justseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], object]] = ...
iterable: Iterable[_T], key: Callable[[_T], object] | None = ...
) -> Iterator[_T]: ...
@overload
def iter_except(
func: Callable[[], _T],
exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]],
exception: Type[BaseException] | tuple[Type[BaseException], ...],
first: None = ...,
) -> Iterator[_T]: ...
@overload
def iter_except(
func: Callable[[], _T],
exception: Union[Type[BaseException], Tuple[Type[BaseException], ...]],
exception: Type[BaseException] | tuple[Type[BaseException], ...],
first: Callable[[], _U],
) -> Iterator[Union[_T, _U]]: ...
) -> Iterator[_T | _U]: ...
@overload
def first_true(
iterable: Iterable[_T], *, pred: Optional[Callable[[_T], object]] = ...
) -> Optional[_T]: ...
iterable: Iterable[_T], *, pred: Callable[[_T], object] | None = ...
) -> _T | None: ...
@overload
def first_true(
iterable: Iterable[_T],
default: _U,
pred: Optional[Callable[[_T], object]] = ...,
) -> Union[_T, _U]: ...
pred: Callable[[_T], object] | None = ...,
) -> _T | _U: ...
def random_product(
*args: Iterable[_T], repeat: int = ...
) -> Tuple[_T, ...]: ...
) -> tuple[_T, ...]: ...
def random_permutation(
iterable: Iterable[_T], r: Optional[int] = ...
) -> Tuple[_T, ...]: ...
def random_combination(iterable: Iterable[_T], r: int) -> Tuple[_T, ...]: ...
iterable: Iterable[_T], r: int | None = ...
) -> tuple[_T, ...]: ...
def random_combination(iterable: Iterable[_T], r: int) -> tuple[_T, ...]: ...
def random_combination_with_replacement(
iterable: Iterable[_T], r: int
) -> Tuple[_T, ...]: ...
) -> tuple[_T, ...]: ...
def nth_combination(
iterable: Iterable[_T], r: int, index: int
) -> Tuple[_T, ...]: ...
def prepend(value: _T, iterator: Iterable[_U]) -> Iterator[Union[_T, _U]]: ...
) -> tuple[_T, ...]: ...
def prepend(value: _T, iterator: Iterable[_U]) -> Iterator[_T | _U]: ...
def convolve(signal: Iterable[_T], kernel: Iterable[_T]) -> Iterator[_T]: ...
def before_and_after(
predicate: Callable[[_T], bool], it: Iterable[_T]
) -> Tuple[Iterator[_T], Iterator[_T]]: ...
def triplewise(iterable: Iterable[_T]) -> Iterator[Tuple[_T, _T, _T]]: ...
) -> tuple[Iterator[_T], Iterator[_T]]: ...
def triplewise(iterable: Iterable[_T]) -> Iterator[tuple[_T, _T, _T]]: ...
def sliding_window(
iterable: Iterable[_T], n: int
) -> Iterator[Tuple[_T, ...]]: ...
def subslices(iterable: Iterable[_T]) -> Iterator[List[_T]]: ...
def polynomial_from_roots(roots: Sequence[int]) -> List[int]: ...
) -> Iterator[tuple[_T, ...]]: ...
def subslices(iterable: Iterable[_T]) -> Iterator[list[_T]]: ...
def polynomial_from_roots(roots: Sequence[int]) -> list[int]: ...
def iter_index(
iterable: Iterable[object],
value: Any,
start: int | None = ...,
) -> Iterator[int]: ...
def sieve(n: int) -> Iterator[int]: ...
def batched(
iterable: Iterable[_T],
n: int,
) -> Iterator[List[_T]]: ...
) -> Iterator[list[_T]]: ...
def transpose(
it: Iterable[Iterable[_T]],
) -> tuple[Iterator[_T], ...]: ...
def matmul(m1: Sequence[_T], m2: Sequence[_T]) -> Iterator[list[_T]]: ...
def factor(n: int) -> Iterator[int]: ...

View file

@ -11,9 +11,6 @@ from typing import Union, Tuple, Iterable
from typing import cast
from jaraco.functools import once
# some useful constants
osc_per_year = 290_091_329_207_984_000
"""
@ -36,7 +33,7 @@ seconds_per_month = seconds_per_year / 12
hours_per_month = hours_per_day * days_per_year / 12
@once
@functools.lru_cache()
def _needs_year_help() -> bool:
"""
Some versions of Python render %Y with only three characters :(

View file

@ -1,149 +0,0 @@
import time
import random
import datetime
from unittest import mock
import pytest
import pytz
import freezegun
from tempora import schedule
do_nothing = type(None)
def test_delayed_command_order():
"""
delayed commands should be sorted by delay time
"""
delays = [random.randint(0, 99) for x in range(5)]
cmds = sorted(
[schedule.DelayedCommand.after(delay, do_nothing) for delay in delays]
)
assert [c.delay.seconds for c in cmds] == sorted(delays)
def test_periodic_command_delay():
"A PeriodicCommand must have a positive, non-zero delay."
with pytest.raises(ValueError) as exc_info:
schedule.PeriodicCommand.after(0, None)
assert str(exc_info.value) == test_periodic_command_delay.__doc__
def test_periodic_command_fixed_delay():
"""
Test that we can construct a periodic command with a fixed initial
delay.
"""
fd = schedule.PeriodicCommandFixedDelay.at_time(
at=schedule.now(), delay=datetime.timedelta(seconds=2), target=lambda: None
)
assert fd.due() is True
assert fd.next().due() is False
class TestCommands:
def test_delayed_command_from_timestamp(self):
"""
Ensure a delayed command can be constructed from a timestamp.
"""
t = time.time()
schedule.DelayedCommand.at_time(t, do_nothing)
def test_command_at_noon(self):
"""
Create a periodic command that's run at noon every day.
"""
when = datetime.time(12, 0, tzinfo=pytz.utc)
cmd = schedule.PeriodicCommandFixedDelay.daily_at(when, target=None)
assert cmd.due() is False
next_cmd = cmd.next()
daily = datetime.timedelta(days=1)
day_from_now = schedule.now() + daily
two_days_from_now = day_from_now + daily
assert day_from_now < next_cmd < two_days_from_now
@pytest.mark.parametrize("hour", range(10, 14))
@pytest.mark.parametrize("tz_offset", (14, -14))
def test_command_at_noon_distant_local(self, hour, tz_offset):
"""
Run test_command_at_noon, but with the local timezone
more than 12 hours away from UTC.
"""
with freezegun.freeze_time(f"2020-01-10 {hour:02}:01", tz_offset=tz_offset):
self.test_command_at_noon()
class TestTimezones:
def test_alternate_timezone_west(self):
target_tz = pytz.timezone('US/Pacific')
target = schedule.now().astimezone(target_tz)
cmd = schedule.DelayedCommand.at_time(target, target=None)
assert cmd.due()
def test_alternate_timezone_east(self):
target_tz = pytz.timezone('Europe/Amsterdam')
target = schedule.now().astimezone(target_tz)
cmd = schedule.DelayedCommand.at_time(target, target=None)
assert cmd.due()
def test_daylight_savings(self):
"""
A command at 9am should always be 9am regardless of
a DST boundary.
"""
with freezegun.freeze_time('2018-03-10 08:00:00'):
target_tz = pytz.timezone('US/Eastern')
target_time = datetime.time(9, tzinfo=target_tz)
cmd = schedule.PeriodicCommandFixedDelay.daily_at(
target_time, target=lambda: None
)
def naive(dt):
return dt.replace(tzinfo=None)
assert naive(cmd) == datetime.datetime(2018, 3, 10, 9, 0, 0)
next_ = cmd.next()
assert naive(next_) == datetime.datetime(2018, 3, 11, 9, 0, 0)
assert next_ - cmd == datetime.timedelta(hours=23)
class TestScheduler:
def test_invoke_scheduler(self):
sched = schedule.InvokeScheduler()
target = mock.MagicMock()
cmd = schedule.DelayedCommand.after(0, target)
sched.add(cmd)
sched.run_pending()
target.assert_called_once()
assert not sched.queue
def test_callback_scheduler(self):
callback = mock.MagicMock()
sched = schedule.CallbackScheduler(callback)
target = mock.MagicMock()
cmd = schedule.DelayedCommand.after(0, target)
sched.add(cmd)
sched.run_pending()
callback.assert_called_once_with(target)
def test_periodic_command(self):
sched = schedule.InvokeScheduler()
target = mock.MagicMock()
before = datetime.datetime.utcnow()
cmd = schedule.PeriodicCommand.after(10, target)
sched.add(cmd)
sched.run_pending()
target.assert_not_called()
with freezegun.freeze_time(before + datetime.timedelta(seconds=15)):
sched.run_pending()
assert sched.queue
target.assert_called_once()
with freezegun.freeze_time(before + datetime.timedelta(seconds=25)):
sched.run_pending()
assert target.call_count == 2

View file

@ -1,50 +0,0 @@
import datetime
import time
import contextlib
import os
from unittest import mock
import pytest
from tempora import timing
def test_IntervalGovernor():
"""
IntervalGovernor should prevent a function from being called more than
once per interval.
"""
func_under_test = mock.MagicMock()
# to look like a function, it needs a __name__ attribute
func_under_test.__name__ = 'func_under_test'
interval = datetime.timedelta(seconds=1)
governed = timing.IntervalGovernor(interval)(func_under_test)
governed('a')
governed('b')
governed(3, 'sir')
func_under_test.assert_called_once_with('a')
@pytest.fixture
def alt_tz(monkeypatch):
hasattr(time, 'tzset') or pytest.skip("tzset not available")
@contextlib.contextmanager
def change():
val = 'AEST-10AEDT-11,M10.5.0,M3.5.0'
with monkeypatch.context() as ctx:
ctx.setitem(os.environ, 'TZ', val)
time.tzset()
yield
time.tzset()
return change()
def test_Stopwatch_timezone_change(alt_tz):
"""
The stopwatch should provide a consistent duration even
if the timezone changes.
"""
watch = timing.Stopwatch()
with alt_tz:
assert abs(watch.split().total_seconds()) < 0.1

View file

@ -42,7 +42,7 @@ rumps==0.4.0; platform_system == "Darwin"
simplejson==3.18.0
six==1.16.0
soupsieve==2.3.2.post1
tempora==5.1.0
tempora==5.2.1
tokenize-rt==5.0.0
tzdata==2022.7
tzlocal==4.2