Update cherrypy==18.9.0

This commit is contained in:
JonnyWong16 2024-03-24 17:55:12 -07:00
commit 51196a7fb1
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
137 changed files with 44442 additions and 11582 deletions

View file

@ -5,23 +5,49 @@ import itertools
import copy
import functools
import random
from collections.abc import Container, Iterable, Mapping
from typing import Callable, Union
from jaraco.classes.properties import NonDataProperty
import jaraco.text
_Matchable = Union[Callable, Container, Iterable, re.Pattern]
def _dispatch(obj: _Matchable) -> Callable:
# can't rely on singledispatch for Union[Container, Iterable]
# due to ambiguity
# (https://peps.python.org/pep-0443/#abstract-base-classes).
if isinstance(obj, re.Pattern):
return obj.fullmatch
if not isinstance(obj, Callable): # type: ignore
if not isinstance(obj, Container):
obj = set(obj) # type: ignore
obj = obj.__contains__
return obj # type: ignore
class Projection(collections.abc.Mapping):
"""
Project a set of keys over a mapping
>>> sample = {'a': 1, 'b': 2, 'c': 3}
>>> prj = Projection(['a', 'c', 'd'], sample)
>>> prj == {'a': 1, 'c': 3}
>>> dict(prj)
{'a': 1, 'c': 3}
Projection also accepts an iterable or callable or pattern.
>>> iter_prj = Projection(iter('acd'), sample)
>>> call_prj = Projection(lambda k: ord(k) in (97, 99, 100), sample)
>>> pat_prj = Projection(re.compile(r'[acd]'), sample)
>>> prj == iter_prj == call_prj == pat_prj
True
Keys should only appear if they were specified and exist in the space.
Order is retained.
>>> sorted(list(prj.keys()))
>>> list(prj)
['a', 'c']
Attempting to access a key not in the projection
@ -36,119 +62,58 @@ class Projection(collections.abc.Mapping):
>>> target = {'a': 2, 'b': 2}
>>> target.update(prj)
>>> target == {'a': 1, 'b': 2, 'c': 3}
True
>>> target
{'a': 1, 'b': 2, 'c': 3}
Also note that Projection keeps a reference to the original dict, so
if you modify the original dict, that could modify the Projection.
Projection keeps a reference to the original dict, so
modifying the original dict may modify the Projection.
>>> del sample['a']
>>> dict(prj)
{'c': 3}
"""
def __init__(self, keys, space):
self._keys = tuple(keys)
def __init__(self, keys: _Matchable, space: Mapping):
self._match = _dispatch(keys)
self._space = space
def __getitem__(self, key):
if key not in self._keys:
if not self._match(key):
raise KeyError(key)
return self._space[key]
def _keys_resolved(self):
return filter(self._match, self._space)
def __iter__(self):
return iter(set(self._keys).intersection(self._space))
return self._keys_resolved()
def __len__(self):
return len(tuple(iter(self)))
return len(tuple(self._keys_resolved()))
class DictFilter(collections.abc.Mapping):
class Mask(Projection):
"""
Takes a dict, and simulates a sub-dict based on the keys.
The inverse of a :class:`Projection`, masking out keys.
>>> sample = {'a': 1, 'b': 2, 'c': 3}
>>> filtered = DictFilter(sample, ['a', 'c'])
>>> filtered == {'a': 1, 'c': 3}
True
>>> set(filtered.values()) == {1, 3}
True
>>> set(filtered.items()) == {('a', 1), ('c', 3)}
True
One can also filter by a regular expression pattern
>>> sample['d'] = 4
>>> sample['ef'] = 5
Here we filter for only single-character keys
>>> filtered = DictFilter(sample, include_pattern='.$')
>>> filtered == {'a': 1, 'b': 2, 'c': 3, 'd': 4}
True
>>> filtered['e']
Traceback (most recent call last):
...
KeyError: 'e'
>>> 'e' in filtered
False
Pattern is useful for excluding keys with a prefix.
>>> filtered = DictFilter(sample, include_pattern=r'(?![ace])')
>>> dict(filtered)
{'b': 2, 'd': 4}
Also note that DictFilter keeps a reference to the original dict, so
if you modify the original dict, that could modify the filtered dict.
>>> del sample['d']
>>> dict(filtered)
>>> msk = Mask(['a', 'c', 'd'], sample)
>>> dict(msk)
{'b': 2}
"""
def __init__(self, dict, include_keys=[], include_pattern=None):
self.dict = dict
self.specified_keys = set(include_keys)
if include_pattern is not None:
self.include_pattern = re.compile(include_pattern)
else:
# for performance, replace the pattern_keys property
self.pattern_keys = set()
def get_pattern_keys(self):
keys = filter(self.include_pattern.match, self.dict.keys())
return set(keys)
pattern_keys = NonDataProperty(get_pattern_keys)
@property
def include_keys(self):
return self.specified_keys | self.pattern_keys
def __getitem__(self, i):
if i not in self.include_keys:
raise KeyError(i)
return self.dict[i]
def __iter__(self):
return filter(self.include_keys.__contains__, self.dict.keys())
def __len__(self):
return len(list(self))
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# self._match = compose(operator.not_, self._match)
self._match = lambda key, orig=self._match: not orig(key)
def dict_map(function, dictionary):
"""
dict_map is much like the built-in function map. It takes a dictionary
and applys a function to the values of that dictionary, returning a
new dictionary with the mapped values in the original keys.
Return a new dict with function applied to values of dictionary.
>>> d = dict_map(lambda x:x+1, dict(a=1, b=2))
>>> d == dict(a=2,b=3)
True
>>> dict_map(lambda x: x+1, dict(a=1, b=2))
{'a': 2, 'b': 3}
"""
return dict((key, function(value)) for key, value in dictionary.items())
@ -164,7 +129,7 @@ class RangeMap(dict):
One may supply keyword parameters to be passed to the sort function used
to sort keys (i.e. key, reverse) as sort_params.
Let's create a map that maps 1-3 -> 'a', 4-6 -> 'b'
Create a map that maps 1-3 -> 'a', 4-6 -> 'b'
>>> r = RangeMap({3: 'a', 6: 'b'}) # boy, that was easy
>>> r[1], r[2], r[3], r[4], r[5], r[6]
@ -176,7 +141,7 @@ class RangeMap(dict):
>>> r[4.5]
'b'
But you'll notice that the way rangemap is defined, it must be open-ended
Notice that the way rangemap is defined, it must be open-ended
on one side.
>>> r[0]
@ -279,7 +244,7 @@ class RangeMap(dict):
return (sorted_keys[RangeMap.first_item], sorted_keys[RangeMap.last_item])
# some special values for the RangeMap
undefined_value = type(str('RangeValueUndefined'), (), {})()
undefined_value = type('RangeValueUndefined', (), {})()
class Item(int):
"RangeMap Item"
@ -294,7 +259,7 @@ def __identity(x):
def sorted_items(d, key=__identity, reverse=False):
"""
Return the items of the dictionary sorted by the keys
Return the items of the dictionary sorted by the keys.
>>> sample = dict(foo=20, bar=42, baz=10)
>>> tuple(sorted_items(sample))
@ -307,6 +272,7 @@ def sorted_items(d, key=__identity, reverse=False):
>>> tuple(sorted_items(sample, reverse=True))
(('foo', 20), ('baz', 10), ('bar', 42))
"""
# wrap the key func so it operates on the first element of each item
def pairkey_key(item):
return key(item[0])
@ -475,7 +441,7 @@ class ItemsAsAttributes:
Mix-in class to enable a mapping object to provide items as
attributes.
>>> C = type(str('C'), (dict, ItemsAsAttributes), dict())
>>> C = type('C', (dict, ItemsAsAttributes), dict())
>>> i = C()
>>> i['foo'] = 'bar'
>>> i.foo
@ -504,7 +470,7 @@ class ItemsAsAttributes:
>>> missing_func = lambda self, key: 'missing item'
>>> C = type(
... str('C'),
... 'C',
... (dict, ItemsAsAttributes),
... dict(__missing__ = missing_func),
... )

View file

@ -5,10 +5,18 @@ import functools
import tempfile
import shutil
import operator
import warnings
@contextlib.contextmanager
def pushd(dir):
"""
>>> tmp_path = getfixture('tmp_path')
>>> with pushd(tmp_path):
... assert os.getcwd() == os.fspath(tmp_path)
>>> assert os.getcwd() != os.fspath(tmp_path)
"""
orig = os.getcwd()
os.chdir(dir)
try:
@ -29,6 +37,8 @@ def tarball_context(url, target_dir=None, runner=None, pushd=pushd):
target_dir = os.path.basename(url).replace('.tar.gz', '').replace('.tgz', '')
if runner is None:
runner = functools.partial(subprocess.check_call, shell=True)
else:
warnings.warn("runner parameter is deprecated", DeprecationWarning)
# In the tar command, use --strip-components=1 to strip the first path and
# then
# use -C to cause the files to be extracted to {target_dir}. This ensures
@ -48,6 +58,15 @@ def tarball_context(url, target_dir=None, runner=None, pushd=pushd):
def infer_compression(url):
"""
Given a URL or filename, infer the compression code for tar.
>>> infer_compression('http://foo/bar.tar.gz')
'z'
>>> infer_compression('http://foo/bar.tgz')
'z'
>>> infer_compression('file.bz')
'j'
>>> infer_compression('file.xz')
'J'
"""
# cheat and just assume it's the last two characters
compression_indicator = url[-2:]
@ -61,6 +80,12 @@ def temp_dir(remover=shutil.rmtree):
"""
Create a temporary directory context. Pass a custom remover
to override the removal behavior.
>>> import pathlib
>>> with temp_dir() as the_dir:
... assert os.path.isdir(the_dir)
... _ = pathlib.Path(the_dir).joinpath('somefile').write_text('contents')
>>> assert not os.path.exists(the_dir)
"""
temp_dir = tempfile.mkdtemp()
try:
@ -90,6 +115,12 @@ def repo_context(url, branch=None, quiet=True, dest_ctx=temp_dir):
@contextlib.contextmanager
def null():
"""
A null context suitable to stand in for a meaningful context.
>>> with null() as value:
... assert value is None
"""
yield
@ -112,6 +143,10 @@ class ExceptionTrap:
... raise ValueError("1 + 1 is not 3")
>>> bool(trap)
True
>>> trap.value
ValueError('1 + 1 is not 3')
>>> trap.tb
<traceback object at ...>
>>> with ExceptionTrap(ValueError) as trap:
... raise Exception()
@ -211,3 +246,43 @@ class suppress(contextlib.suppress, contextlib.ContextDecorator):
... {}['']
>>> key_error()
"""
class on_interrupt(contextlib.ContextDecorator):
"""
Replace a KeyboardInterrupt with SystemExit(1)
>>> def do_interrupt():
... raise KeyboardInterrupt()
>>> on_interrupt('error')(do_interrupt)()
Traceback (most recent call last):
...
SystemExit: 1
>>> on_interrupt('error', code=255)(do_interrupt)()
Traceback (most recent call last):
...
SystemExit: 255
>>> on_interrupt('suppress')(do_interrupt)()
>>> with __import__('pytest').raises(KeyboardInterrupt):
... on_interrupt('ignore')(do_interrupt)()
"""
def __init__(
self,
action='error',
# py3.7 compat
# /,
code=1,
):
self.action = action
self.code = code
def __enter__(self):
return self
def __exit__(self, exctype, excinst, exctb):
if exctype is not KeyboardInterrupt or self.action == 'ignore':
return
elif self.action == 'error':
raise SystemExit(self.code) from excinst
return self.action == 'suppress'

View file

@ -1,4 +1,4 @@
import collections
import collections.abc
import functools
import inspect
import itertools
@ -9,11 +9,6 @@ import warnings
import more_itertools
from typing import Callable, TypeVar
CallableT = TypeVar("CallableT", bound=Callable[..., object])
def compose(*funcs):
"""
@ -39,24 +34,6 @@ def compose(*funcs):
return functools.reduce(compose_two, funcs)
def method_caller(method_name, *args, **kwargs):
"""
Return a function that will call a named method on the
target object with optional positional and keyword
arguments.
>>> lower = method_caller('lower')
>>> lower('MyString')
'mystring'
"""
def call_method(target):
func = getattr(target, method_name)
return func(*args, **kwargs)
return call_method
def once(func):
"""
Decorate func so it's only ever called the first time.
@ -99,12 +76,7 @@ def once(func):
return wrapper
def method_cache(
method: CallableT,
cache_wrapper: Callable[
[CallableT], CallableT
] = functools.lru_cache(), # type: ignore[assignment]
) -> CallableT:
def method_cache(method, cache_wrapper=functools.lru_cache()):
"""
Wrap lru_cache to support storing the cache data in the object instances.
@ -172,22 +144,17 @@ def method_cache(
for another implementation and additional justification.
"""
def wrapper(self: object, *args: object, **kwargs: object) -> object:
def wrapper(self, *args, **kwargs):
# it's the first call, replace the method with a cached, bound method
bound_method: CallableT = types.MethodType( # type: ignore[assignment]
method, self
)
bound_method = types.MethodType(method, self)
cached_method = cache_wrapper(bound_method)
setattr(self, method.__name__, cached_method)
return cached_method(*args, **kwargs)
# Support cache clear even before cache has been created.
wrapper.cache_clear = lambda: None # type: ignore[attr-defined]
wrapper.cache_clear = lambda: None
return (
_special_method_cache(method, cache_wrapper) # type: ignore[return-value]
or wrapper
)
return _special_method_cache(method, cache_wrapper) or wrapper
def _special_method_cache(method, cache_wrapper):
@ -203,12 +170,13 @@ def _special_method_cache(method, cache_wrapper):
"""
name = method.__name__
special_names = '__getattr__', '__getitem__'
if name not in special_names:
return
return None
wrapper_name = '__cached' + name
def proxy(self, *args, **kwargs):
def proxy(self, /, *args, **kwargs):
if wrapper_name not in vars(self):
bound = types.MethodType(method, self)
cache = cache_wrapper(bound)
@ -245,7 +213,7 @@ def result_invoke(action):
r"""
Decorate a function with an action function that is
invoked on the results returned from the decorated
function (for its side-effect), then return the original
function (for its side effect), then return the original
result.
>>> @result_invoke(print)
@ -269,7 +237,7 @@ def result_invoke(action):
return wrap
def invoke(f, *args, **kwargs):
def invoke(f, /, *args, **kwargs):
"""
Call a function for its side effect after initialization.
@ -304,25 +272,15 @@ def invoke(f, *args, **kwargs):
Use functools.partial to pass parameters to the initial call
>>> @functools.partial(invoke, name='bingo')
... def func(name): print("called with", name)
... def func(name): print('called with', name)
called with bingo
"""
f(*args, **kwargs)
return f
def call_aside(*args, **kwargs):
"""
Deprecated name for invoke.
"""
warnings.warn("call_aside is deprecated, use invoke", DeprecationWarning)
return invoke(*args, **kwargs)
class Throttler:
"""
Rate-limit a function (or other callable)
"""
"""Rate-limit a function (or other callable)."""
def __init__(self, func, max_rate=float('Inf')):
if isinstance(func, Throttler):
@ -339,20 +297,20 @@ class Throttler:
return self.func(*args, **kwargs)
def _wait(self):
"ensure at least 1/max_rate seconds from last call"
"""Ensure at least 1/max_rate seconds from last call."""
elapsed = time.time() - self.last_called
must_wait = 1 / self.max_rate - elapsed
time.sleep(max(0, must_wait))
self.last_called = time.time()
def __get__(self, obj, type=None):
def __get__(self, obj, owner=None):
return first_invoke(self._wait, functools.partial(self.func, obj))
def first_invoke(func1, func2):
"""
Return a function that when invoked will invoke func1 without
any parameters (for its side-effect) and then invoke func2
any parameters (for its side effect) and then invoke func2
with whatever parameters were passed, returning its result.
"""
@ -363,6 +321,17 @@ def first_invoke(func1, func2):
return wrapper
method_caller = first_invoke(
lambda: warnings.warn(
'`jaraco.functools.method_caller` is deprecated, '
'use `operator.methodcaller` instead',
DeprecationWarning,
stacklevel=3,
),
operator.methodcaller,
)
def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
"""
Given a callable func, trap the indicated exceptions
@ -371,7 +340,7 @@ def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
to propagate.
"""
attempts = itertools.count() if retries == float('inf') else range(retries)
for attempt in attempts:
for _ in attempts:
try:
return func()
except trap:
@ -408,7 +377,7 @@ def retry(*r_args, **r_kwargs):
def print_yielded(func):
"""
Convert a generator into a function that prints all yielded elements
Convert a generator into a function that prints all yielded elements.
>>> @print_yielded
... def x():
@ -424,7 +393,7 @@ def print_yielded(func):
def pass_none(func):
"""
Wrap func so it's not called if its first param is None
Wrap func so it's not called if its first param is None.
>>> print_text = pass_none(print)
>>> print_text('text')
@ -433,9 +402,10 @@ def pass_none(func):
"""
@functools.wraps(func)
def wrapper(param, *args, **kwargs):
def wrapper(param, /, *args, **kwargs):
if param is not None:
return func(param, *args, **kwargs)
return None
return wrapper
@ -509,7 +479,7 @@ def save_method_args(method):
args_and_kwargs = collections.namedtuple('args_and_kwargs', 'args kwargs')
@functools.wraps(method)
def wrapper(self, *args, **kwargs):
def wrapper(self, /, *args, **kwargs):
attr_name = '_saved_' + method.__name__
attr = args_and_kwargs(args, kwargs)
setattr(self, attr_name, attr)
@ -559,6 +529,13 @@ def except_(*exceptions, replace=None, use=None):
def identity(x):
"""
Return the argument.
>>> o = object()
>>> identity(o) is o
True
"""
return x
@ -580,7 +557,7 @@ def bypass_when(check, *, _op=identity):
def decorate(func):
@functools.wraps(func)
def wrapper(param):
def wrapper(param, /):
return param if _op(check) else func(param)
return wrapper
@ -604,3 +581,53 @@ def bypass_unless(check):
2
"""
return bypass_when(check, _op=operator.not_)
@functools.singledispatch
def _splat_inner(args, func):
"""Splat args to func."""
return func(*args)
@_splat_inner.register
def _(args: collections.abc.Mapping, func):
"""Splat kargs to func as kwargs."""
return func(**args)
def splat(func):
"""
Wrap func to expect its parameters to be passed positionally in a tuple.
Has a similar effect to that of ``itertools.starmap`` over
simple ``map``.
>>> pairs = [(-1, 1), (0, 2)]
>>> more_itertools.consume(itertools.starmap(print, pairs))
-1 1
0 2
>>> more_itertools.consume(map(splat(print), pairs))
-1 1
0 2
The approach generalizes to other iterators that don't have a "star"
equivalent, such as a "starfilter".
>>> list(filter(splat(operator.add), pairs))
[(0, 2)]
Splat also accepts a mapping argument.
>>> def is_nice(msg, code):
... return "smile" in msg or code == 0
>>> msgs = [
... dict(msg='smile!', code=20),
... dict(msg='error :(', code=1),
... dict(msg='unknown', code=0),
... ]
>>> for msg in filter(splat(is_nice), msgs):
... print(msg)
{'msg': 'smile!', 'code': 20}
{'msg': 'unknown', 'code': 0}
"""
return functools.wraps(func)(functools.partial(_splat_inner, func=func))

View file

@ -0,0 +1,128 @@
from collections.abc import Callable, Hashable, Iterator
from functools import partial
from operator import methodcaller
import sys
from typing import (
Any,
Generic,
Protocol,
TypeVar,
overload,
)
if sys.version_info >= (3, 10):
from typing import Concatenate, ParamSpec
else:
from typing_extensions import Concatenate, ParamSpec
_P = ParamSpec('_P')
_R = TypeVar('_R')
_T = TypeVar('_T')
_R1 = TypeVar('_R1')
_R2 = TypeVar('_R2')
_V = TypeVar('_V')
_S = TypeVar('_S')
_R_co = TypeVar('_R_co', covariant=True)
class _OnceCallable(Protocol[_P, _R]):
saved_result: _R
reset: Callable[[], None]
def __call__(self, *args: _P.args, **kwargs: _P.kwargs) -> _R: ...
class _ProxyMethodCacheWrapper(Protocol[_R_co]):
cache_clear: Callable[[], None]
def __call__(self, *args: Hashable, **kwargs: Hashable) -> _R_co: ...
class _MethodCacheWrapper(Protocol[_R_co]):
def cache_clear(self) -> None: ...
def __call__(self, *args: Hashable, **kwargs: Hashable) -> _R_co: ...
# `compose()` overloads below will cover most use cases.
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[_P, _R],
/,
) -> Callable[_P, _T]: ...
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[[_R1], _R],
__func3: Callable[_P, _R1],
/,
) -> Callable[_P, _T]: ...
@overload
def compose(
__func1: Callable[[_R], _T],
__func2: Callable[[_R2], _R],
__func3: Callable[[_R1], _R2],
__func4: Callable[_P, _R1],
/,
) -> Callable[_P, _T]: ...
def once(func: Callable[_P, _R]) -> _OnceCallable[_P, _R]: ...
def method_cache(
method: Callable[..., _R],
cache_wrapper: Callable[[Callable[..., _R]], _MethodCacheWrapper[_R]] = ...,
) -> _MethodCacheWrapper[_R] | _ProxyMethodCacheWrapper[_R]: ...
def apply(
transform: Callable[[_R], _T]
) -> Callable[[Callable[_P, _R]], Callable[_P, _T]]: ...
def result_invoke(
action: Callable[[_R], Any]
) -> Callable[[Callable[_P, _R]], Callable[_P, _R]]: ...
def invoke(
f: Callable[_P, _R], /, *args: _P.args, **kwargs: _P.kwargs
) -> Callable[_P, _R]: ...
def call_aside(
f: Callable[_P, _R], *args: _P.args, **kwargs: _P.kwargs
) -> Callable[_P, _R]: ...
class Throttler(Generic[_R]):
last_called: float
func: Callable[..., _R]
max_rate: float
def __init__(
self, func: Callable[..., _R] | Throttler[_R], max_rate: float = ...
) -> None: ...
def reset(self) -> None: ...
def __call__(self, *args: Any, **kwargs: Any) -> _R: ...
def __get__(self, obj: Any, owner: type[Any] | None = ...) -> Callable[..., _R]: ...
def first_invoke(
func1: Callable[..., Any], func2: Callable[_P, _R]
) -> Callable[_P, _R]: ...
method_caller: Callable[..., methodcaller]
def retry_call(
func: Callable[..., _R],
cleanup: Callable[..., None] = ...,
retries: int | float = ...,
trap: type[BaseException] | tuple[type[BaseException], ...] = ...,
) -> _R: ...
def retry(
cleanup: Callable[..., None] = ...,
retries: int | float = ...,
trap: type[BaseException] | tuple[type[BaseException], ...] = ...,
) -> Callable[[Callable[..., _R]], Callable[..., _R]]: ...
def print_yielded(func: Callable[_P, Iterator[Any]]) -> Callable[_P, None]: ...
def pass_none(
func: Callable[Concatenate[_T, _P], _R]
) -> Callable[Concatenate[_T, _P], _R]: ...
def assign_params(
func: Callable[..., _R], namespace: dict[str, Any]
) -> partial[_R]: ...
def save_method_args(
method: Callable[Concatenate[_S, _P], _R]
) -> Callable[Concatenate[_S, _P], _R]: ...
def except_(
*exceptions: type[BaseException], replace: Any = ..., use: Any = ...
) -> Callable[[Callable[_P, Any]], Callable[_P, Any]]: ...
def identity(x: _T) -> _T: ...
def bypass_when(
check: _V, *, _op: Callable[[_V], Any] = ...
) -> Callable[[Callable[[_T], _R]], Callable[[_T], _T | _R]]: ...
def bypass_unless(
check: Any,
) -> Callable[[Callable[[_T], _R]], Callable[[_T], _T | _R]]: ...

View file

View file

@ -227,10 +227,12 @@ def unwrap(s):
return '\n'.join(cleaned)
lorem_ipsum: str = files(__name__).joinpath('Lorem ipsum.txt').read_text()
lorem_ipsum: str = (
files(__name__).joinpath('Lorem ipsum.txt').read_text(encoding='utf-8')
)
class Splitter(object):
class Splitter:
"""object that will split a string with the given arguments for each call
>>> s = Splitter(',')
@ -367,7 +369,7 @@ class WordSet(tuple):
return self.trim_left(item).trim_right(item)
def __getitem__(self, item):
result = super(WordSet, self).__getitem__(item)
result = super().__getitem__(item)
if isinstance(item, slice):
result = WordSet(result)
return result
@ -582,7 +584,7 @@ def join_continuation(lines):
['foobarbaz']
Not sure why, but...
The character preceeding the backslash is also elided.
The character preceding the backslash is also elided.
>>> list(join_continuation(['goo\\', 'dly']))
['godly']
@ -607,16 +609,16 @@ def read_newlines(filename, limit=1024):
r"""
>>> tmp_path = getfixture('tmp_path')
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\n', newline='')
>>> _ = filename.write_text('foo\n', newline='', encoding='utf-8')
>>> read_newlines(filename)
'\n'
>>> _ = filename.write_text('foo\r\n', newline='')
>>> _ = filename.write_text('foo\r\n', newline='', encoding='utf-8')
>>> read_newlines(filename)
'\r\n'
>>> _ = filename.write_text('foo\r\nbar\nbing\r', newline='')
>>> _ = filename.write_text('foo\r\nbar\nbing\r', newline='', encoding='utf-8')
>>> read_newlines(filename)
('\r', '\n', '\r\n')
"""
with open(filename) as fp:
with open(filename, encoding='utf-8') as fp:
fp.read(limit)
return fp.newlines

View file

@ -12,11 +12,11 @@ def report_newlines(filename):
>>> tmp_path = getfixture('tmp_path')
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\nbar\n', newline='')
>>> _ = filename.write_text('foo\nbar\n', newline='', encoding='utf-8')
>>> report_newlines(filename)
newline is '\n'
>>> filename = tmp_path / 'out.txt'
>>> _ = filename.write_text('foo\nbar\r\n', newline='')
>>> _ = filename.write_text('foo\nbar\r\n', newline='', encoding='utf-8')
>>> report_newlines(filename)
newlines are ('\n', '\r\n')
"""

View file

@ -0,0 +1,21 @@
import sys
import autocommand
from jaraco.text import Stripper
def strip_prefix():
r"""
Strip any common prefix from stdin.
>>> import io, pytest
>>> getfixture('monkeypatch').setattr('sys.stdin', io.StringIO('abcdef\nabc123'))
>>> strip_prefix()
def
123
"""
sys.stdout.writelines(Stripper.strip_prefix(sys.stdin).lines)
autocommand.autocommand(__name__)(strip_prefix)