Update portend==3.1.0

This commit is contained in:
JonnyWong16 2021-11-28 14:22:22 -08:00
parent 5f43c4c890
commit 0325e9327f
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
5 changed files with 519 additions and 73 deletions

View file

@ -1,4 +1,4 @@
from .more import * # noqa
from .recipes import * # noqa
__version__ = '8.10.0'
__version__ = '8.12.0'

View file

@ -22,7 +22,7 @@ from itertools import (
from math import exp, factorial, floor, log
from queue import Empty, Queue
from random import random, randrange, uniform
from operator import itemgetter, mul, sub, gt, lt
from operator import itemgetter, mul, sub, gt, lt, ge, le
from sys import hexversion, maxsize
from time import monotonic
@ -37,7 +37,10 @@ from .recipes import (
__all__ = [
'AbortThread',
'SequenceView',
'UnequalIterablesError',
'adjacent',
'all_unique',
'always_iterable',
'always_reversible',
'bucket',
@ -47,29 +50,31 @@ __all__ = [
'circular_shifts',
'collapse',
'collate',
'combination_index',
'consecutive_groups',
'consumer',
'countable',
'count_cycle',
'mark_ends',
'countable',
'difference',
'distinct_combinations',
'distinct_permutations',
'distribute',
'divide',
'duplicates_everseen',
'duplicates_justseen',
'exactly_n',
'filter_except',
'first',
'groupby_transform',
'ichunked',
'ilen',
'interleave_longest',
'interleave',
'interleave_evenly',
'interleave_longest',
'intersperse',
'is_sorted',
'islice_extended',
'iterate',
'ichunked',
'is_sorted',
'last',
'locate',
'lstrip',
@ -77,6 +82,8 @@ __all__ = [
'map_except',
'map_if',
'map_reduce',
'mark_ends',
'minmax',
'nth_or_last',
'nth_permutation',
'nth_product',
@ -85,8 +92,10 @@ __all__ = [
'only',
'padded',
'partitions',
'set_partitions',
'peekable',
'permutation_index',
'product_index',
'raise_',
'repeat_each',
'repeat_last',
'replace',
@ -95,37 +104,35 @@ __all__ = [
'run_length',
'sample',
'seekable',
'SequenceView',
'set_partitions',
'side_effect',
'sliced',
'sort_together',
'split_at',
'split_after',
'split_at',
'split_before',
'split_when',
'split_into',
'split_when',
'spy',
'stagger',
'strip',
'strictly_n',
'substrings',
'substrings_indexes',
'time_limited',
'unique_in_window',
'unique_to_each',
'unzip',
'value_chain',
'windowed',
'windowed_complete',
'with_iter',
'UnequalIterablesError',
'zip_broadcast',
'zip_equal',
'zip_offset',
'windowed_complete',
'all_unique',
'value_chain',
'product_index',
'combination_index',
'permutation_index',
'zip_broadcast',
]
_marker = object()
@ -585,6 +592,84 @@ def one(iterable, too_short=None, too_long=None):
return first_value
def raise_(exception, *args):
raise exception(*args)
def strictly_n(iterable, n, too_short=None, too_long=None):
"""Validate that *iterable* has exactly *n* items and return them if
it does. If it has fewer than *n* items, call function *too_short*
with those items. If it has more than *n* items, call function
*too_long* with the first ``n + 1`` items.
>>> iterable = ['a', 'b', 'c', 'd']
>>> n = 4
>>> list(strictly_n(iterable, n))
['a', 'b', 'c', 'd']
By default, *too_short* and *too_long* are functions that raise
``ValueError``.
>>> list(strictly_n('ab', 3)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError: too few items in iterable (got 2)
>>> list(strictly_n('abc', 2)) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
ValueError: too many items in iterable (got at least 3)
You can instead supply functions that do something else.
*too_short* will be called with the number of items in *iterable*.
*too_long* will be called with `n + 1`.
>>> def too_short(item_count):
... raise RuntimeError
>>> it = strictly_n('abcd', 6, too_short=too_short)
>>> list(it) # doctest: +IGNORE_EXCEPTION_DETAIL
Traceback (most recent call last):
...
RuntimeError
>>> def too_long(item_count):
... print('The boss is going to hear about this')
>>> it = strictly_n('abcdef', 4, too_long=too_long)
>>> list(it)
The boss is going to hear about this
['a', 'b', 'c', 'd']
"""
if too_short is None:
too_short = lambda item_count: raise_(
ValueError,
'Too few items in iterable (got {})'.format(item_count),
)
if too_long is None:
too_long = lambda item_count: raise_(
ValueError,
'Too many items in iterable (got at least {})'.format(item_count),
)
it = iter(iterable)
for i in range(n):
try:
item = next(it)
except StopIteration:
too_short(i)
return
else:
yield item
try:
next(it)
except StopIteration:
pass
else:
too_long(n + 1)
def distinct_permutations(iterable, r=None):
"""Yield successive distinct permutations of the elements in *iterable*.
@ -699,8 +784,8 @@ def intersperse(e, iterable, n=1):
if n == 0:
raise ValueError('n must be > 0')
elif n == 1:
# interleave(repeat(e), iterable) -> e, x_0, e, e, x_1, e, x_2...
# islice(..., 1, None) -> x_0, e, e, x_1, e, x_2...
# interleave(repeat(e), iterable) -> e, x_0, e, x_1, e, x_2...
# islice(..., 1, None) -> x_0, e, x_1, e, x_2...
return islice(interleave(repeat(e), iterable), 1, None)
else:
# interleave(filler, chunks) -> [e], [x_0, x_1], [e], [x_2, x_3]...
@ -1580,6 +1665,26 @@ def _zip_equal_generator(iterables):
yield combo
def _zip_equal(*iterables):
# Check whether the iterables are all the same size.
try:
first_size = len(iterables[0])
for i, it in enumerate(iterables[1:], 1):
size = len(it)
if size != first_size:
break
else:
# If we didn't break out, we can use the built-in zip.
return zip(*iterables)
# If we did break out, there was a mismatch.
raise UnequalIterablesError(details=(first_size, i, size))
# If any one of the iterables didn't have a length, start reading
# them until one runs out.
except TypeError:
return _zip_equal_generator(iterables)
def zip_equal(*iterables):
"""``zip`` the input *iterables* together, but raise
``UnequalIterablesError`` if they aren't all the same length.
@ -1607,23 +1712,8 @@ def zip_equal(*iterables):
),
DeprecationWarning,
)
# Check whether the iterables are all the same size.
try:
first_size = len(iterables[0])
for i, it in enumerate(iterables[1:], 1):
size = len(it)
if size != first_size:
break
else:
# If we didn't break out, we can use the built-in zip.
return zip(*iterables)
# If we did break out, there was a mismatch.
raise UnequalIterablesError(details=(first_size, i, size))
# If any one of the iterables didn't have a length, start reading
# them until one runs out.
except TypeError:
return _zip_equal_generator(iterables)
return _zip_equal(*iterables)
def zip_offset(*iterables, offsets, longest=False, fillvalue=None):
@ -3478,7 +3568,7 @@ def sample(iterable, k, weights=None):
return _sample_weighted(iterable, k, weights)
def is_sorted(iterable, key=None, reverse=False):
def is_sorted(iterable, key=None, reverse=False, strict=False):
"""Returns ``True`` if the items of iterable are in sorted order, and
``False`` otherwise. *key* and *reverse* have the same meaning that they do
in the built-in :func:`sorted` function.
@ -3488,12 +3578,20 @@ def is_sorted(iterable, key=None, reverse=False):
>>> is_sorted([5, 4, 3, 1, 2], reverse=True)
False
If *strict*, tests for strict sorting, that is, returns ``False`` if equal
elements are found:
>>> is_sorted([1, 2, 2])
True
>>> is_sorted([1, 2, 2], strict=True)
False
The function returns ``False`` after encountering the first out-of-order
item. If there are no out-of-order items, the iterable is exhausted.
"""
compare = lt if reverse else gt
it = iterable if (key is None) else map(key, iterable)
compare = (le if reverse else ge) if strict else (lt if reverse else gt)
it = iterable if key is None else map(key, iterable)
return not any(starmap(compare, pairwise(it)))
@ -4016,39 +4114,204 @@ def zip_broadcast(*objects, scalar_types=(str, bytes), strict=False):
If the *strict* keyword argument is ``True``, then
``UnequalIterablesError`` will be raised if any of the iterables have
different lengths.
different lengthss.
"""
if not objects:
def is_scalar(obj):
if scalar_types and isinstance(obj, scalar_types):
return True
try:
iter(obj)
except TypeError:
return True
else:
return False
size = len(objects)
if not size:
return
iterables = []
all_scalar = True
for obj in objects:
# If the object is one of our scalar types, turn it into an iterable
# by wrapping it with itertools.repeat
if scalar_types and isinstance(obj, scalar_types):
iterables.append((repeat(obj), False))
# Otherwise, test to see whether the object is iterable.
# If it is, collect it. If it's not, treat it as a scalar.
iterables, iterable_positions = [], []
scalars, scalar_positions = [], []
for i, obj in enumerate(objects):
if is_scalar(obj):
scalars.append(obj)
scalar_positions.append(i)
else:
try:
iterables.append((iter(obj), True))
except TypeError:
iterables.append((repeat(obj), False))
else:
all_scalar = False
iterables.append(iter(obj))
iterable_positions.append(i)
# If all the objects were scalar, we just emit them as a tuple.
# Otherwise we zip the collected iterable objects.
if all_scalar:
if len(scalars) == size:
yield tuple(objects)
else:
yield from zip(*(it for it, is_it in iterables))
return
# For strict mode, we ensure that all the iterable objects have been
# exhausted.
if strict:
for it, is_it in filter(itemgetter(1), iterables):
if next(it, _marker) is not _marker:
raise UnequalIterablesError
zipper = _zip_equal if strict else zip
for item in zipper(*iterables):
new_item = [None] * size
for i, elem in zip(iterable_positions, item):
new_item[i] = elem
for i, elem in zip(scalar_positions, scalars):
new_item[i] = elem
yield tuple(new_item)
def unique_in_window(iterable, n, key=None):
"""Yield the items from *iterable* that haven't been seen recently.
*n* is the size of the lookback window.
>>> iterable = [0, 1, 0, 2, 3, 0]
>>> n = 3
>>> list(unique_in_window(iterable, n))
[0, 1, 2, 3, 0]
The *key* function, if provided, will be used to determine uniqueness:
>>> list(unique_in_window('abAcda', 3, key=lambda x: x.lower()))
['a', 'b', 'c', 'd', 'a']
The items in *iterable* must be hashable.
"""
if n <= 0:
raise ValueError('n must be greater than 0')
window = deque(maxlen=n)
uniques = set()
use_key = key is not None
for item in iterable:
k = key(item) if use_key else item
if k in uniques:
continue
if len(uniques) == n:
uniques.discard(window[0])
uniques.add(k)
window.append(k)
yield item
def duplicates_everseen(iterable, key=None):
"""Yield duplicate elements after their first appearance.
>>> list(duplicates_everseen('mississippi'))
['s', 'i', 's', 's', 'i', 'p', 'i']
>>> list(duplicates_everseen('AaaBbbCccAaa', str.lower))
['a', 'a', 'b', 'b', 'c', 'c', 'A', 'a', 'a']
This function is analagous to :func:`unique_everseen` and is subject to
the same performance considerations.
"""
seen_set = set()
seen_list = []
use_key = key is not None
for element in iterable:
k = key(element) if use_key else element
try:
if k not in seen_set:
seen_set.add(k)
else:
yield element
except TypeError:
if k not in seen_list:
seen_list.append(k)
else:
yield element
def duplicates_justseen(iterable, key=None):
"""Yields serially-duplicate elements after their first appearance.
>>> list(duplicates_justseen('mississippi'))
['s', 's', 'p']
>>> list(duplicates_justseen('AaaBbbCccAaa', str.lower))
['a', 'a', 'b', 'b', 'c', 'c', 'a', 'a']
This function is analagous to :func:`unique_justseen`.
"""
return flatten(
map(
lambda group_tuple: islice_extended(group_tuple[1])[1:],
groupby(iterable, key),
)
)
def minmax(iterable_or_value, *others, key=None, default=_marker):
"""Returns both the smallest and largest items in an iterable
or the largest of two or more arguments.
>>> minmax([3, 1, 5])
(1, 5)
>>> minmax(4, 2, 6)
(2, 6)
If a *key* function is provided, it will be used to transform the input
items for comparison.
>>> minmax([5, 30], key=str) # '30' sorts before '5'
(30, 5)
If a *default* value is provided, it will be returned if there are no
input items.
>>> minmax([], default=(0, 0))
(0, 0)
Otherwise ``ValueError`` is raised.
This function is based on the
`recipe <http://code.activestate.com/recipes/577916/>`__ by
Raymond Hettinger and takes care to minimize the number of comparisons
performed.
"""
iterable = (iterable_or_value, *others) if others else iterable_or_value
it = iter(iterable)
try:
lo = hi = next(it)
except StopIteration as e:
if default is _marker:
raise ValueError(
'`minmax()` argument is an empty iterable. '
'Provide a `default` value to suppress this error.'
) from e
return default
# Different branches depending on the presence of key. This saves a lot
# of unimportant copies which would slow the "key=None" branch
# significantly down.
if key is None:
for x, y in zip_longest(it, it, fillvalue=lo):
if y < x:
x, y = y, x
if x < lo:
lo = x
if hi < y:
hi = y
else:
lo_key = hi_key = key(lo)
for x, y in zip_longest(it, it, fillvalue=lo):
x_key, y_key = key(x), key(y)
if y_key < x_key:
x, y, x_key, y_key = y, x, y_key, x_key
if x_key < lo_key:
lo, lo_key = x, x_key
if hi_key < y_key:
hi, hi_key = y, y_key
return lo, hi

View file

@ -84,6 +84,13 @@ def one(
too_short: Optional[_Raisable] = ...,
too_long: Optional[_Raisable] = ...,
) -> _T: ...
def raise_(exception: _Raisable, *args: Any) -> None: ...
def strictly_n(
iterable: Iterable[_T],
n: int,
too_short: Optional[_GenFn] = ...,
too_long: Optional[_GenFn] = ...,
) -> List[_T]: ...
def distinct_permutations(
iterable: Iterable[_T], r: Optional[int] = ...
) -> Iterator[Tuple[_T, ...]]: ...
@ -293,12 +300,62 @@ def adjacent(
iterable: Iterable[_T],
distance: int = ...,
) -> Iterator[Tuple[bool, _T]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Optional[Callable[[_T], _U]] = ...,
valuefunc: Optional[Callable[[_T], _V]] = ...,
reducefunc: Optional[Callable[..., _W]] = ...,
) -> Iterator[Tuple[_T, _W]]: ...
keyfunc: None = None,
valuefunc: None = None,
reducefunc: None = None,
) -> Iterator[Tuple[_T, Iterator[_T]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: None,
reducefunc: None,
) -> Iterator[Tuple[_U, Iterator[_T]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: Callable[[_T], _V],
reducefunc: None,
) -> Iterable[Tuple[_T, Iterable[_V]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: None,
) -> Iterable[Tuple[_U, Iterator[_V]]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: None,
reducefunc: Callable[[Iterator[_T]], _W],
) -> Iterable[Tuple[_T, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: None,
reducefunc: Callable[[Iterator[_T]], _W],
) -> Iterable[Tuple[_U, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: None,
valuefunc: Callable[[_T], _V],
reducefunc: Callable[[Iterable[_V]], _W],
) -> Iterable[Tuple[_T, _W]]: ...
@overload
def groupby_transform(
iterable: Iterable[_T],
keyfunc: Callable[[_T], _U],
valuefunc: Callable[[_T], _V],
reducefunc: Callable[[Iterable[_V]], _W],
) -> Iterable[Tuple[_U, _W]]: ...
class numeric_range(Generic[_T, _U], Sequence[_T], Hashable, Reversible[_T]):
@overload
@ -494,6 +551,7 @@ def is_sorted(
iterable: Iterable[_T],
key: Optional[Callable[[_T], _U]] = ...,
reverse: bool = False,
strict: bool = False,
) -> bool: ...
class AbortThread(BaseException):
@ -554,3 +612,53 @@ def zip_broadcast(
] = ...,
strict: bool = ...
) -> Iterable[Tuple[_T, ...]]: ...
def unique_in_window(
iterable: Iterable[_T], n: int, key: Optional[Callable[[_T], _U]] = ...
) -> Iterator[_T]: ...
def duplicates_everseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
) -> Iterator[_T]: ...
def duplicates_justseen(
iterable: Iterable[_T], key: Optional[Callable[[_T], _U]] = ...
) -> Iterator[_T]: ...
class _SupportsLessThan(Protocol):
def __lt__(self, __other: Any) -> bool: ...
_SupportsLessThanT = TypeVar("_SupportsLessThanT", bound=_SupportsLessThan)
@overload
def minmax(
iterable_or_value: Iterable[_SupportsLessThanT], *, key: None = None
) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
@overload
def minmax(
iterable_or_value: Iterable[_T], *, key: Callable[[_T], _SupportsLessThan]
) -> Tuple[_T, _T]: ...
@overload
def minmax(
iterable_or_value: Iterable[_SupportsLessThanT],
*,
key: None = None,
default: _U
) -> Union[_U, Tuple[_SupportsLessThanT, _SupportsLessThanT]]: ...
@overload
def minmax(
iterable_or_value: Iterable[_T],
*,
key: Callable[[_T], _SupportsLessThan],
default: _U,
) -> Union[_U, Tuple[_T, _T]]: ...
@overload
def minmax(
iterable_or_value: _SupportsLessThanT,
__other: _SupportsLessThanT,
*others: _SupportsLessThanT
) -> Tuple[_SupportsLessThanT, _SupportsLessThanT]: ...
@overload
def minmax(
iterable_or_value: _T,
__other: _T,
*others: _T,
key: Callable[[_T], _SupportsLessThan]
) -> Tuple[_T, _T]: ...

View file

@ -26,6 +26,7 @@ from random import randrange, sample, choice
__all__ = [
'all_equal',
'before_and_after',
'consume',
'convolve',
'dotproduct',
@ -49,9 +50,11 @@ __all__ = [
'random_product',
'repeatfunc',
'roundrobin',
'sliding_window',
'tabulate',
'tail',
'take',
'triplewise',
'unique_everseen',
'unique_justseen',
]
@ -628,3 +631,68 @@ def convolve(signal, kernel):
for x in chain(signal, repeat(0, n - 1)):
window.append(x)
yield sum(map(operator.mul, kernel, window))
def before_and_after(predicate, it):
"""A variant of :func:`takewhile` that allows complete access to the
remainder of the iterator.
>>> it = iter('ABCdEfGhI')
>>> all_upper, remainder = before_and_after(str.isupper, it)
>>> ''.join(all_upper)
'ABC'
>>> ''.join(remainder) # takewhile() would lose the 'd'
'dEfGhI'
Note that the first iterator must be fully consumed before the second
iterator can generate valid results.
"""
it = iter(it)
transition = []
def true_iterator():
for elem in it:
if predicate(elem):
yield elem
else:
transition.append(elem)
return
def remainder_iterator():
yield from transition
yield from it
return true_iterator(), remainder_iterator()
def triplewise(iterable):
"""Return overlapping triplets from *iterable*.
>>> list(triplewise('ABCDE'))
[('A', 'B', 'C'), ('B', 'C', 'D'), ('C', 'D', 'E')]
"""
for (a, _), (b, c) in pairwise(pairwise(iterable)):
yield a, b, c
def sliding_window(iterable, n):
"""Return a sliding window of width *n* over *iterable*.
>>> list(sliding_window(range(6), 4))
[(0, 1, 2, 3), (1, 2, 3, 4), (2, 3, 4, 5)]
If *iterable* has fewer than *n* items, then nothing is yielded:
>>> list(sliding_window(range(3), 4))
[]
For a variant with more features, see :func:`windowed`.
"""
it = iter(iterable)
window = deque(islice(it, n), maxlen=n)
if len(window) == n:
yield tuple(window)
for x in it:
window.append(x)
yield tuple(window)

View file

@ -103,3 +103,10 @@ def nth_combination(
) -> Tuple[_T, ...]: ...
def prepend(value: _T, iterator: Iterable[_U]) -> Iterator[Union[_T, _U]]: ...
def convolve(signal: Iterable[_T], kernel: Iterable[_T]) -> Iterator[_T]: ...
def before_and_after(
predicate: Callable[[_T], bool], it: Iterable[_T]
) -> Tuple[Iterator[_T], Iterator[_T]]: ...
def triplewise(iterable: Iterable[_T]) -> Iterator[Tuple[_T, _T, _T]]: ...
def sliding_window(
iterable: Iterable[_T], n: int
) -> Iterator[Tuple[_T, ...]]: ...