Update jaraco.windows to v3.6:

Dependencies:
* backports.functools-lru-cache	1.2.1
* jaraco.classes	1.3
* jaraco.collections	1.3.2
* jaraco.functools	1.11
* jaraco.structures	1.0
* jaraco.text	1.7
* jaraco.ui	1.4
* jaraco.windows	3.6
* more-itertools	2.2
* path.py	8.2.1
* six	1.10.0
This commit is contained in:
Labrys 2016-06-06 12:38:26 -04:00
parent f093fafd8d
commit 6bb4ae56bd
30 changed files with 6271 additions and 10 deletions

View file

@ -0,0 +1 @@
import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('backports',));ie = os.path.exists(os.path.join(p,'__init__.py'));m = not ie and sys.modules.setdefault('backports', types.ModuleType('backports'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p)

View file

@ -0,0 +1,184 @@
from __future__ import absolute_import
import functools
from collections import namedtuple
from threading import RLock
_CacheInfo = namedtuple("CacheInfo", ["hits", "misses", "maxsize", "currsize"])
@functools.wraps(functools.update_wrapper)
def update_wrapper(wrapper,
wrapped,
assigned = functools.WRAPPER_ASSIGNMENTS,
updated = functools.WRAPPER_UPDATES):
"""
Patch two bugs in functools.update_wrapper.
"""
# workaround for http://bugs.python.org/issue3445
assigned = tuple(attr for attr in assigned if hasattr(wrapped, attr))
wrapper = functools.update_wrapper(wrapper, wrapped, assigned, updated)
# workaround for https://bugs.python.org/issue17482
wrapper.__wrapped__ = wrapped
return wrapper
class _HashedSeq(list):
__slots__ = 'hashvalue'
def __init__(self, tup, hash=hash):
self[:] = tup
self.hashvalue = hash(tup)
def __hash__(self):
return self.hashvalue
def _make_key(args, kwds, typed,
kwd_mark=(object(),),
fasttypes=set([int, str, frozenset, type(None)]),
sorted=sorted, tuple=tuple, type=type, len=len):
'Make a cache key from optionally typed positional and keyword arguments'
key = args
if kwds:
sorted_items = sorted(kwds.items())
key += kwd_mark
for item in sorted_items:
key += item
if typed:
key += tuple(type(v) for v in args)
if kwds:
key += tuple(type(v) for k, v in sorted_items)
elif len(key) == 1 and type(key[0]) in fasttypes:
return key[0]
return _HashedSeq(key)
def lru_cache(maxsize=100, typed=False):
"""Least-recently-used cache decorator.
If *maxsize* is set to None, the LRU features are disabled and the cache
can grow without bound.
If *typed* is True, arguments of different types will be cached separately.
For example, f(3.0) and f(3) will be treated as distinct calls with
distinct results.
Arguments to the cached function must be hashable.
View the cache statistics named tuple (hits, misses, maxsize, currsize) with
f.cache_info(). Clear the cache and statistics with f.cache_clear().
Access the underlying function with f.__wrapped__.
See: http://en.wikipedia.org/wiki/Cache_algorithms#Least_Recently_Used
"""
# Users should only access the lru_cache through its public API:
# cache_info, cache_clear, and f.__wrapped__
# The internals of the lru_cache are encapsulated for thread safety and
# to allow the implementation to change (including a possible C version).
def decorating_function(user_function):
cache = dict()
stats = [0, 0] # make statistics updateable non-locally
HITS, MISSES = 0, 1 # names for the stats fields
make_key = _make_key
cache_get = cache.get # bound method to lookup key or return None
_len = len # localize the global len() function
lock = RLock() # because linkedlist updates aren't threadsafe
root = [] # root of the circular doubly linked list
root[:] = [root, root, None, None] # initialize by pointing to self
nonlocal_root = [root] # make updateable non-locally
PREV, NEXT, KEY, RESULT = 0, 1, 2, 3 # names for the link fields
if maxsize == 0:
def wrapper(*args, **kwds):
# no caching, just do a statistics update after a successful call
result = user_function(*args, **kwds)
stats[MISSES] += 1
return result
elif maxsize is None:
def wrapper(*args, **kwds):
# simple caching without ordering or size limit
key = make_key(args, kwds, typed)
result = cache_get(key, root) # root used here as a unique not-found sentinel
if result is not root:
stats[HITS] += 1
return result
result = user_function(*args, **kwds)
cache[key] = result
stats[MISSES] += 1
return result
else:
def wrapper(*args, **kwds):
# size limited caching that tracks accesses by recency
key = make_key(args, kwds, typed) if kwds or typed else args
with lock:
link = cache_get(key)
if link is not None:
# record recent use of the key by moving it to the front of the list
root, = nonlocal_root
link_prev, link_next, key, result = link
link_prev[NEXT] = link_next
link_next[PREV] = link_prev
last = root[PREV]
last[NEXT] = root[PREV] = link
link[PREV] = last
link[NEXT] = root
stats[HITS] += 1
return result
result = user_function(*args, **kwds)
with lock:
root, = nonlocal_root
if key in cache:
# getting here means that this same key was added to the
# cache while the lock was released. since the link
# update is already done, we need only return the
# computed result and update the count of misses.
pass
elif _len(cache) >= maxsize:
# use the old root to store the new key and result
oldroot = root
oldroot[KEY] = key
oldroot[RESULT] = result
# empty the oldest link and make it the new root
root = nonlocal_root[0] = oldroot[NEXT]
oldkey = root[KEY]
root[KEY] = root[RESULT] = None
# now update the cache dictionary for the new links
del cache[oldkey]
cache[key] = oldroot
else:
# put result in a new link at the front of the list
last = root[PREV]
link = [last, root, key, result]
last[NEXT] = root[PREV] = cache[key] = link
stats[MISSES] += 1
return result
def cache_info():
"""Report cache statistics"""
with lock:
return _CacheInfo(stats[HITS], stats[MISSES], maxsize, len(cache))
def cache_clear():
"""Clear the cache and cache statistics"""
with lock:
cache.clear()
root = nonlocal_root[0]
root[:] = [root, root, None, None]
stats[:] = [0, 0]
wrapper.__wrapped__ = user_function
wrapper.cache_info = cache_info
wrapper.cache_clear = cache_clear
return update_wrapper(wrapper, user_function)
return decorating_function

View file

@ -0,0 +1 @@
import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('jaraco',));ie = os.path.exists(os.path.join(p,'__init__.py'));m = not ie and sys.modules.setdefault('jaraco', types.ModuleType('jaraco'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p)

View file

@ -0,0 +1 @@
import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('jaraco',));ie = os.path.exists(os.path.join(p,'__init__.py'));m = not ie and sys.modules.setdefault('jaraco', types.ModuleType('jaraco'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p)

View file

@ -0,0 +1 @@
import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('jaraco',));ie = os.path.exists(os.path.join(p,'__init__.py'));m = not ie and sys.modules.setdefault('jaraco', types.ModuleType('jaraco'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p)

View file

@ -0,0 +1 @@
import sys, types, os;p = os.path.join(sys._getframe(1).f_locals['sitedir'], *('jaraco',));ie = os.path.exists(os.path.join(p,'__init__.py'));m = not ie and sys.modules.setdefault('jaraco', types.ModuleType('jaraco'));mp = (m or []) and m.__dict__.setdefault('__path__',[]);(p not in mp) and mp.append(p)

View file

@ -1,10 +1 @@
# this is a namespace package __import__("pkg_resources").declare_namespace(__name__)
__import__('pkg_resources').declare_namespace(__name__)
try:
# py2exe support (http://www.py2exe.org/index.cgi/ExeWithEggs)
import modulefinder
for p in __path__:
modulefinder.AddPackagePath(__name__, p)
except ImportError:
pass

View file

View file

@ -0,0 +1,67 @@
"""
Routines for obtaining the class names
of an object and its parent classes.
"""
from __future__ import unicode_literals
def all_bases(c):
"""
return a tuple of all base classes the class c has as a parent.
>>> object in all_bases(list)
True
"""
return c.mro()[1:]
def all_classes(c):
"""
return a tuple of all classes to which c belongs
>>> list in all_classes(list)
True
"""
return c.mro()
# borrowed from http://code.activestate.com/recipes/576949-find-all-subclasses-of-a-given-class/
def iter_subclasses(cls, _seen=None):
"""
Generator over all subclasses of a given class, in depth-first order.
>>> bool in list(iter_subclasses(int))
True
>>> class A(object): pass
>>> class B(A): pass
>>> class C(A): pass
>>> class D(B,C): pass
>>> class E(D): pass
>>>
>>> for cls in iter_subclasses(A):
... print(cls.__name__)
B
D
E
C
>>> # get ALL (new-style) classes currently defined
>>> res = [cls.__name__ for cls in iter_subclasses(object)]
>>> 'type' in res
True
>>> 'tuple' in res
True
>>> len(res) > 100
True
"""
if not isinstance(cls, type):
raise TypeError('iter_subclasses must be called with '
'new-style classes, not %.100r' % cls)
if _seen is None: _seen = set()
try:
subs = cls.__subclasses__()
except TypeError: # fails only when cls is type
subs = cls.__subclasses__(cls)
for sub in subs:
if sub in _seen:
continue
_seen.add(sub)
yield sub
for sub in iter_subclasses(sub, _seen):
yield sub

View file

@ -0,0 +1,40 @@
"""
meta.py
Some useful metaclasses.
"""
from __future__ import unicode_literals
class LeafClassesMeta(type):
"""
A metaclass for classes that keeps track of all of them that
aren't base classes.
"""
_leaf_classes = set()
def __init__(cls, name, bases, attrs):
if not hasattr(cls, '_leaf_classes'):
cls._leaf_classes = set()
leaf_classes = getattr(cls, '_leaf_classes')
leaf_classes.add(cls)
# remove any base classes
leaf_classes -= set(bases)
class TagRegistered(type):
"""
As classes of this metaclass are created, they keep a registry in the
base class of all classes by a class attribute, indicated by attr_name.
"""
attr_name = 'tag'
def __init__(cls, name, bases, namespace):
super(TagRegistered, cls).__init__(name, bases, namespace)
if not hasattr(cls, '_registry'):
cls._registry = {}
meta = cls.__class__
attr = getattr(cls, meta.attr_name, None)
if attr:
cls._registry[attr] = cls

View file

@ -0,0 +1,65 @@
from __future__ import unicode_literals
import six
class NonDataProperty(object):
"""Much like the property builtin, but only implements __get__,
making it a non-data property, and can be subsequently reset.
See http://users.rcn.com/python/download/Descriptor.htm for more
information.
>>> class X(object):
... @NonDataProperty
... def foo(self):
... return 3
>>> x = X()
>>> x.foo
3
>>> x.foo = 4
>>> x.foo
4
"""
def __init__(self, fget):
assert fget is not None, "fget cannot be none"
assert six.callable(fget), "fget must be callable"
self.fget = fget
def __get__(self, obj, objtype=None):
if obj is None:
return self
return self.fget(obj)
# from http://stackoverflow.com/a/5191224
class ClassPropertyDescriptor(object):
def __init__(self, fget, fset=None):
self.fget = fget
self.fset = fset
def __get__(self, obj, klass=None):
if klass is None:
klass = type(obj)
return self.fget.__get__(obj, klass)()
def __set__(self, obj, value):
if not self.fset:
raise AttributeError("can't set attribute")
type_ = type(obj)
return self.fset.__get__(obj, type_)(value)
def setter(self, func):
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
self.fset = func
return self
def classproperty(func):
if not isinstance(func, (classmethod, staticmethod)):
func = classmethod(func)
return ClassPropertyDescriptor(func)

773
libs/jaraco/collections.py Normal file
View file

@ -0,0 +1,773 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import, unicode_literals, division
import re
import operator
import collections
import itertools
import copy
import six
from jaraco.classes.properties import NonDataProperty
import jaraco.text
class DictFilter(object):
"""
Takes a dict, and simulates a sub-dict based on the keys.
>>> sample = {'a': 1, 'b': 2, 'c': 3}
>>> filtered = DictFilter(sample, ['a', 'c'])
>>> filtered == {'a': 1, 'c': 3}
True
One can also filter by a regular expression pattern
>>> sample['d'] = 4
>>> sample['ef'] = 5
Here we filter for only single-character keys
>>> filtered = DictFilter(sample, include_pattern='.$')
>>> filtered == {'a': 1, 'b': 2, 'c': 3, 'd': 4}
True
Also note that DictFilter keeps a reference to the original dict, so
if you modify the original dict, that could modify the filtered dict.
>>> del sample['d']
>>> del sample['a']
>>> filtered == {'b': 2, 'c': 3}
True
"""
def __init__(self, dict, include_keys=[], include_pattern=None):
self.dict = dict
self.specified_keys = set(include_keys)
if include_pattern is not None:
self.include_pattern = re.compile(include_pattern)
else:
# for performance, replace the pattern_keys property
self.pattern_keys = set()
def get_pattern_keys(self):
#key_matches = lambda k, v: self.include_pattern.match(k)
keys = filter(self.include_pattern.match, self.dict.keys())
return set(keys)
pattern_keys = NonDataProperty(get_pattern_keys)
@property
def include_keys(self):
return self.specified_keys.union(self.pattern_keys)
def keys(self):
return self.include_keys.intersection(self.dict.keys())
def values(self):
keys = self.keys()
values = map(self.dict.get, keys)
return values
def __getitem__(self, i):
if not i in self.include_keys:
return KeyError, i
return self.dict[i]
def items(self):
keys = self.keys()
values = map(self.dict.get, keys)
return zip(keys, values)
def __eq__(self, other):
return dict(self) == other
def __ne__(self, other):
return dict(self) != other
def dict_map(function, dictionary):
"""
dict_map is much like the built-in function map. It takes a dictionary
and applys a function to the values of that dictionary, returning a
new dictionary with the mapped values in the original keys.
>>> d = dict_map(lambda x:x+1, dict(a=1, b=2))
>>> d == dict(a=2,b=3)
True
"""
return dict((key, function(value)) for key, value in dictionary.items())
class RangeMap(dict):
"""
A dictionary-like object that uses the keys as bounds for a range.
Inclusion of the value for that range is determined by the
key_match_comparator, which defaults to less-than-or-equal.
A value is returned for a key if it is the first key that matches in
the sorted list of keys.
One may supply keyword parameters to be passed to the sort function used
to sort keys (i.e. cmp [python 2 only], keys, reverse) as sort_params.
Let's create a map that maps 1-3 -> 'a', 4-6 -> 'b'
>>> r = RangeMap({3: 'a', 6: 'b'}) # boy, that was easy
>>> r[1], r[2], r[3], r[4], r[5], r[6]
('a', 'a', 'a', 'b', 'b', 'b')
Even float values should work so long as the comparison operator
supports it.
>>> r[4.5]
'b'
But you'll notice that the way rangemap is defined, it must be open-ended
on one side.
>>> r[0]
'a'
>>> r[-1]
'a'
One can close the open-end of the RangeMap by using undefined_value
>>> r = RangeMap({0: RangeMap.undefined_value, 3: 'a', 6: 'b'})
>>> r[0]
Traceback (most recent call last):
...
KeyError: 0
One can get the first or last elements in the range by using RangeMap.Item
>>> last_item = RangeMap.Item(-1)
>>> r[last_item]
'b'
.last_item is a shortcut for Item(-1)
>>> r[RangeMap.last_item]
'b'
Sometimes it's useful to find the bounds for a RangeMap
>>> r.bounds()
(0, 6)
RangeMap supports .get(key, default)
>>> r.get(0, 'not found')
'not found'
>>> r.get(7, 'not found')
'not found'
"""
def __init__(self, source, sort_params = {}, key_match_comparator = operator.le):
dict.__init__(self, source)
self.sort_params = sort_params
self.match = key_match_comparator
def __getitem__(self, item):
sorted_keys = sorted(self.keys(), **self.sort_params)
if isinstance(item, RangeMap.Item):
result = self.__getitem__(sorted_keys[item])
else:
key = self._find_first_match_(sorted_keys, item)
result = dict.__getitem__(self, key)
if result is RangeMap.undefined_value:
raise KeyError(key)
return result
def get(self, key, default=None):
"""
Return the value for key if key is in the dictionary, else default.
If default is not given, it defaults to None, so that this method
never raises a KeyError.
"""
try:
return self[key]
except KeyError:
return default
def _find_first_match_(self, keys, item):
is_match = lambda k: self.match(item, k)
matches = list(filter(is_match, keys))
if matches:
return matches[0]
raise KeyError(item)
def bounds(self):
sorted_keys = sorted(self.keys(), **self.sort_params)
return (
sorted_keys[RangeMap.first_item],
sorted_keys[RangeMap.last_item],
)
# some special values for the RangeMap
undefined_value = type(str('RangeValueUndefined'), (object,), {})()
class Item(int): pass
first_item = Item(0)
last_item = Item(-1)
__identity = lambda x: x
def sorted_items(d, key=__identity, reverse=False):
"""
Return the items of the dictionary sorted by the keys
>>> sample = dict(foo=20, bar=42, baz=10)
>>> tuple(sorted_items(sample))
(('bar', 42), ('baz', 10), ('foo', 20))
>>> reverse_string = lambda s: ''.join(reversed(s))
>>> tuple(sorted_items(sample, key=reverse_string))
(('foo', 20), ('bar', 42), ('baz', 10))
>>> tuple(sorted_items(sample, reverse=True))
(('foo', 20), ('baz', 10), ('bar', 42))
"""
# wrap the key func so it operates on the first element of each item
pairkey_key = lambda item: key(item[0])
return sorted(d.items(), key=pairkey_key, reverse=reverse)
class KeyTransformingDict(dict):
"""
A dict subclass that transforms the keys before they're used.
Subclasses may override the default transform_key to customize behavior.
"""
@staticmethod
def transform_key(key):
return key
def __init__(self, *args, **kargs):
super(KeyTransformingDict, self).__init__()
# build a dictionary using the default constructs
d = dict(*args, **kargs)
# build this dictionary using transformed keys.
for item in d.items():
self.__setitem__(*item)
def __setitem__(self, key, val):
key = self.transform_key(key)
super(KeyTransformingDict, self).__setitem__(key, val)
def __getitem__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__getitem__(key)
def __contains__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__contains__(key)
def __delitem__(self, key):
key = self.transform_key(key)
return super(KeyTransformingDict, self).__delitem__(key)
def get(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).get(key, *args, **kwargs)
def setdefault(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).setdefault(key, *args, **kwargs)
def pop(self, key, *args, **kwargs):
key = self.transform_key(key)
return super(KeyTransformingDict, self).pop(key, *args, **kwargs)
def matching_key_for(self, key):
"""
Given a key, return the actual key stored in self that matches.
Raise KeyError if the key isn't found.
"""
try:
return next(e_key for e_key in self.keys() if e_key == key)
except StopIteration:
raise KeyError(key)
class FoldedCaseKeyedDict(KeyTransformingDict):
"""
A case-insensitive dictionary (keys are compared as insensitive
if they are strings).
>>> d = FoldedCaseKeyedDict()
>>> d['heLlo'] = 'world'
>>> list(d.keys()) == ['heLlo']
True
>>> list(d.values()) == ['world']
True
>>> d['hello'] == 'world'
True
>>> 'hello' in d
True
>>> 'HELLO' in d
True
>>> print(repr(FoldedCaseKeyedDict({'heLlo': 'world'})).replace("u'", "'"))
{'heLlo': 'world'}
>>> d = FoldedCaseKeyedDict({'heLlo': 'world'})
>>> print(d['hello'])
world
>>> print(d['Hello'])
world
>>> list(d.keys())
['heLlo']
>>> d = FoldedCaseKeyedDict({'heLlo': 'world', 'Hello': 'world'})
>>> list(d.values())
['world']
>>> key, = d.keys()
>>> key in ['heLlo', 'Hello']
True
>>> del d['HELLO']
>>> d
{}
get should work
>>> d['Sumthin'] = 'else'
>>> d.get('SUMTHIN')
'else'
>>> d.get('OTHER', 'thing')
'thing'
>>> del d['sumthin']
setdefault should also work
>>> d['This'] = 'that'
>>> print(d.setdefault('this', 'other'))
that
>>> len(d)
1
>>> print(d['this'])
that
>>> print(d.setdefault('That', 'other'))
other
>>> print(d['THAT'])
other
Make it pop!
>>> print(d.pop('THAT'))
other
To retrieve the key in its originally-supplied form, use matching_key_for
>>> print(d.matching_key_for('this'))
This
"""
@staticmethod
def transform_key(key):
return jaraco.text.FoldedCase(key)
class DictAdapter(object):
"""
Provide a getitem interface for attributes of an object.
Let's say you want to get at the string.lowercase property in a formatted
string. It's easy with DictAdapter.
>>> import string
>>> print("lowercase is %(ascii_lowercase)s" % DictAdapter(string))
lowercase is abcdefghijklmnopqrstuvwxyz
"""
def __init__(self, wrapped_ob):
self.object = wrapped_ob
def __getitem__(self, name):
return getattr(self.object, name)
class ItemsAsAttributes(object):
"""
Mix-in class to enable a mapping object to provide items as
attributes.
>>> C = type(str('C'), (dict, ItemsAsAttributes), dict())
>>> i = C()
>>> i['foo'] = 'bar'
>>> i.foo
'bar'
Natural attribute access takes precedence
>>> i.foo = 'henry'
>>> i.foo
'henry'
But as you might expect, the mapping functionality is preserved.
>>> i['foo']
'bar'
A normal attribute error should be raised if an attribute is
requested that doesn't exist.
>>> i.missing
Traceback (most recent call last):
...
AttributeError: 'C' object has no attribute 'missing'
It also works on dicts that customize __getitem__
>>> missing_func = lambda self, key: 'missing item'
>>> C = type(str('C'), (dict, ItemsAsAttributes), dict(__missing__ = missing_func))
>>> i = C()
>>> i.missing
'missing item'
>>> i.foo
'missing item'
"""
def __getattr__(self, key):
try:
return getattr(super(ItemsAsAttributes, self), key)
except AttributeError as e:
# attempt to get the value from the mapping (return self[key])
# but be careful not to lose the original exception context.
noval = object()
def _safe_getitem(cont, key, missing_result):
try:
return cont[key]
except KeyError:
return missing_result
result = _safe_getitem(self, key, noval)
if result is not noval:
return result
# raise the original exception, but use the original class
# name, not 'super'.
message, = e.args
message = message.replace('super', self.__class__.__name__, 1)
e.args = message,
raise
def invert_map(map):
"""
Given a dictionary, return another dictionary with keys and values
switched. If any of the values resolve to the same key, raises
a ValueError.
>>> numbers = dict(a=1, b=2, c=3)
>>> letters = invert_map(numbers)
>>> letters[1]
'a'
>>> numbers['d'] = 3
>>> invert_map(numbers)
Traceback (most recent call last):
...
ValueError: Key conflict in inverted mapping
"""
res = dict((v,k) for k, v in map.items())
if not len(res) == len(map):
raise ValueError('Key conflict in inverted mapping')
return res
class IdentityOverrideMap(dict):
"""
A dictionary that by default maps each key to itself, but otherwise
acts like a normal dictionary.
>>> d = IdentityOverrideMap()
>>> d[42]
42
>>> d['speed'] = 'speedo'
>>> print(d['speed'])
speedo
"""
def __missing__(self, key):
return key
class DictStack(list, collections.Mapping):
"""
A stack of dictionaries that behaves as a view on those dictionaries,
giving preference to the last.
>>> stack = DictStack([dict(a=1, c=2), dict(b=2, a=2)])
>>> stack['a']
2
>>> stack['b']
2
>>> stack['c']
2
>>> stack.push(dict(a=3))
>>> stack['a']
3
>>> set(stack.keys()) == set(['a', 'b', 'c'])
True
>>> d = stack.pop()
>>> stack['a']
2
>>> d = stack.pop()
>>> stack['a']
1
"""
def keys(self):
return list(set(itertools.chain.from_iterable(c.keys() for c in self)))
def __getitem__(self, key):
for scope in reversed(self):
if key in scope: return scope[key]
raise KeyError(key)
push = list.append
class BijectiveMap(dict):
"""
A Bijective Map (two-way mapping).
Implemented as a simple dictionary of 2x the size, mapping values back
to keys.
Note, this implementation may be incomplete. If there's not a test for
your use case below, it's likely to fail, so please test and send pull
requests or patches for additional functionality needed.
>>> m = BijectiveMap()
>>> m['a'] = 'b'
>>> m == {'a': 'b', 'b': 'a'}
True
>>> print(m['b'])
a
>>> m['c'] = 'd'
>>> len(m)
2
Some weird things happen if you map an item to itself or overwrite a
single key of a pair, so it's disallowed.
>>> m['e'] = 'e'
Traceback (most recent call last):
ValueError: Key cannot map to itself
>>> m['d'] = 'e'
Traceback (most recent call last):
ValueError: Key/Value pairs may not overlap
>>> print(m.pop('d'))
c
>>> 'c' in m
False
>>> m = BijectiveMap(dict(a='b'))
>>> len(m)
1
>>> print(m['b'])
a
>>> m = BijectiveMap()
>>> m.update(a='b')
>>> m['b']
'a'
>>> del m['b']
>>> len(m)
0
>>> 'a' in m
False
"""
def __init__(self, *args, **kwargs):
super(BijectiveMap, self).__init__()
self.update(*args, **kwargs)
def __setitem__(self, item, value):
if item == value:
raise ValueError("Key cannot map to itself")
if (value in self or item in self) and self[item] != value:
raise ValueError("Key/Value pairs may not overlap")
super(BijectiveMap, self).__setitem__(item, value)
super(BijectiveMap, self).__setitem__(value, item)
def __delitem__(self, item):
self.pop(item)
def __len__(self):
return super(BijectiveMap, self).__len__() // 2
def pop(self, key, *args, **kwargs):
mirror = self[key]
super(BijectiveMap, self).__delitem__(mirror)
return super(BijectiveMap, self).pop(key, *args, **kwargs)
def update(self, *args, **kwargs):
# build a dictionary using the default constructs
d = dict(*args, **kwargs)
# build this dictionary using transformed keys.
for item in d.items():
self.__setitem__(*item)
class FrozenDict(collections.Mapping, collections.Hashable):
"""
An immutable mapping.
>>> a = FrozenDict(a=1, b=2)
>>> b = FrozenDict(a=1, b=2)
>>> a == b
True
>>> a == dict(a=1, b=2)
True
>>> dict(a=1, b=2) == a
True
>>> a['c'] = 3
Traceback (most recent call last):
...
TypeError: 'FrozenDict' object does not support item assignment
>>> a.update(y=3)
Traceback (most recent call last):
...
AttributeError: 'FrozenDict' object has no attribute 'update'
Copies should compare equal
>>> copy.copy(a) == a
True
Copies should be the same type
>>> isinstance(copy.copy(a), FrozenDict)
True
FrozenDict supplies .copy(), even though collections.Mapping doesn't
demand it.
>>> a.copy() == a
True
>>> a.copy() is not a
True
"""
__slots__ = ['__data']
def __new__(cls, *args, **kwargs):
self = super(FrozenDict, cls).__new__(cls)
self.__data = dict(*args, **kwargs)
return self
# Container
def __contains__(self, key):
return key in self.__data
# Hashable
def __hash__(self):
return hash(tuple(sorted(self.__data.iteritems())))
# Mapping
def __iter__(self):
return iter(self.__data)
def __len__(self):
return len(self.__data)
def __getitem__(self, key):
return self.__data[key]
# override get for efficiency provided by dict
def get(self, *args, **kwargs):
return self.__data.get(*args, **kwargs)
# override eq to recognize underlying implementation
def __eq__(self, other):
if isinstance(other, FrozenDict):
other = other.__data
return self.__data.__eq__(other)
def copy(self):
"Return a shallow copy of self"
return copy.copy(self)
class Enumeration(ItemsAsAttributes, BijectiveMap):
"""
A convenient way to provide enumerated values
>>> e = Enumeration('a b c')
>>> e['a']
0
>>> e.a
0
>>> e[1]
'b'
>>> set(e.names) == set('abc')
True
>>> set(e.codes) == set(range(3))
True
>>> e.get('d') is None
True
Codes need not start with 0
>>> e = Enumeration('a b c', range(1, 4))
>>> e['a']
1
>>> e[3]
'c'
"""
def __init__(self, names, codes=None):
if isinstance(names, six.string_types):
names = names.split()
if codes is None:
codes = itertools.count()
super(Enumeration, self).__init__(zip(names, codes))
@property
def names(self):
return (key for key in self if isinstance(key, six.string_types))
@property
def codes(self):
return (self[name] for name in self.names)
class Everything(object):
"""
A collection "containing" every possible thing.
>>> 'foo' in Everything()
True
>>> import random
>>> random.randint(1, 999) in Everything()
True
"""
def __contains__(self, other):
return True
class InstrumentedDict(six.moves.UserDict):
"""
Instrument an existing dictionary with additional
functionality, but always reference and mutate
the original dictionary.
>>> orig = {'a': 1, 'b': 2}
>>> inst = InstrumentedDict(orig)
>>> inst['a']
1
>>> inst['c'] = 3
>>> orig['c']
3
>>> inst.keys() == orig.keys()
True
"""
def __init__(self, data):
six.moves.UserDict.__init__(self)
self.data = data

268
libs/jaraco/functools.py Normal file
View file

@ -0,0 +1,268 @@
from __future__ import absolute_import, unicode_literals, print_function, division
import functools
import time
import warnings
try:
from functools import lru_cache
except ImportError:
try:
from backports.functools_lru_cache import lru_cache
except ImportError:
try:
from functools32 import lru_cache
except ImportError:
warnings.warn("No lru_cache available")
def compose(*funcs):
"""
Compose any number of unary functions into a single unary function.
>>> import textwrap
>>> from six import text_type
>>> text_type.strip(textwrap.dedent(compose.__doc__)) == compose(text_type.strip, textwrap.dedent)(compose.__doc__)
True
Compose also allows the innermost function to take arbitrary arguments.
>>> round_three = lambda x: round(x, ndigits=3)
>>> f = compose(round_three, int.__truediv__)
>>> [f(3*x, x+1) for x in range(1,10)]
[1.5, 2.0, 2.25, 2.4, 2.5, 2.571, 2.625, 2.667, 2.7]
"""
compose_two = lambda f1, f2: lambda *args, **kwargs: f1(f2(*args, **kwargs))
return functools.reduce(compose_two, funcs)
def method_caller(method_name, *args, **kwargs):
"""
Return a function that will call a named method on the
target object with optional positional and keyword
arguments.
>>> lower = method_caller('lower')
>>> lower('MyString')
'mystring'
"""
def call_method(target):
func = getattr(target, method_name)
return func(*args, **kwargs)
return call_method
def once(func):
"""
Decorate func so it's only ever called the first time.
This decorator can ensure that an expensive or non-idempotent function
will not be expensive on subsequent calls and is idempotent.
>>> func = once(lambda a: a+3)
>>> func(3)
6
>>> func(9)
6
>>> func('12')
6
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
if not hasattr(func, 'always_returns'):
func.always_returns = func(*args, **kwargs)
return func.always_returns
return wrapper
def method_cache(method, cache_wrapper=None):
"""
Wrap lru_cache to support storing the cache data in the object instances.
Abstracts the common paradigm where the method explicitly saves an
underscore-prefixed protected property on first call and returns that
subsequently.
>>> class MyClass:
... calls = 0
...
... @method_cache
... def method(self, value):
... self.calls += 1
... return value
>>> a = MyClass()
>>> a.method(3)
3
>>> for x in range(75):
... res = a.method(x)
>>> a.calls
75
Note that the apparent behavior will be exactly like that of lru_cache
except that the cache is stored on each instance, so values in one
instance will not flush values from another, and when an instance is
deleted, so are the cached values for that instance.
>>> b = MyClass()
>>> for x in range(35):
... res = b.method(x)
>>> b.calls
35
>>> a.method(0)
0
>>> a.calls
75
Note that if method had been decorated with ``functools.lru_cache()``,
a.calls would have been 76 (due to the cached value of 0 having been
flushed by the 'b' instance).
Clear the cache with ``.cache_clear()``
>>> a.method.cache_clear()
Another cache wrapper may be supplied:
>>> cache = lru_cache(maxsize=2)
>>> MyClass.method2 = method_cache(lambda self: 3, cache_wrapper=cache)
>>> a = MyClass()
>>> a.method2()
3
See also
http://code.activestate.com/recipes/577452-a-memoize-decorator-for-instance-methods/
for another implementation and additional justification.
"""
cache_wrapper = cache_wrapper or lru_cache()
def wrapper(self, *args, **kwargs):
# it's the first call, replace the method with a cached, bound method
bound_method = functools.partial(method, self)
cached_method = cache_wrapper(bound_method)
setattr(self, method.__name__, cached_method)
return cached_method(*args, **kwargs)
return _special_method_cache(method, cache_wrapper) or wrapper
def _special_method_cache(method, cache_wrapper):
"""
Because Python treats special methods differently, it's not
possible to use instance attributes to implement the cached
methods.
Instead, install the wrapper method under a different name
and return a simple proxy to that wrapper.
https://github.com/jaraco/jaraco.functools/issues/5
"""
name = method.__name__
special_names = '__getattr__', '__getitem__'
if name not in special_names:
return
wrapper_name = '__cached' + name
def proxy(self, *args, **kwargs):
if wrapper_name not in vars(self):
bound = functools.partial(method, self)
cache = cache_wrapper(bound)
setattr(self, wrapper_name, cache)
else:
cache = getattr(self, wrapper_name)
return cache(*args, **kwargs)
return proxy
def apply(transform):
"""
Decorate a function with a transform function that is
invoked on results returned from the decorated function.
>>> @apply(reversed)
... def get_numbers(start):
... return range(start, start+3)
>>> list(get_numbers(4))
[6, 5, 4]
"""
def wrap(func):
return compose(transform, func)
return wrap
def call_aside(f, *args, **kwargs):
"""
Call a function for its side effect after initialization.
>>> @call_aside
... def func(): print("called")
called
>>> func()
called
Use functools.partial to pass parameters to the initial call
>>> @functools.partial(call_aside, name='bingo')
... def func(name): print("called with", name)
called with bingo
"""
f(*args, **kwargs)
return f
class Throttler(object):
"""
Rate-limit a function (or other callable)
"""
def __init__(self, func, max_rate=float('Inf')):
if isinstance(func, Throttler):
func = func.func
self.func = func
self.max_rate = max_rate
self.reset()
def reset(self):
self.last_called = 0
def __call__(self, *args, **kwargs):
self._wait()
return self.func(*args, **kwargs)
def _wait(self):
"ensure at least 1/max_rate seconds from last call"
elapsed = time.time() - self.last_called
must_wait = 1 / self.max_rate - elapsed
time.sleep(max(0, must_wait))
self.last_called = time.time()
def __get__(self, obj, type=None):
return first_invoke(self._wait, functools.partial(self.func, obj))
def first_invoke(func1, func2):
"""
Return a function that when invoked will invoke func1 without
any parameters (for its side-effect) and then invoke func2
with whatever parameters were passed, returning its result.
"""
def wrapper(*args, **kwargs):
func1()
return func2(*args, **kwargs)
return wrapper
def retry_call(func, cleanup=lambda: None, retries=0, trap=()):
"""
Given a callable func, trap the indicated exceptions
for up to 'retries' times, invoking cleanup on the
exception. On the final attempt, allow any exceptions
to propagate.
"""
for attempt in range(retries):
try:
return func()
except trap:
cleanup()
return func()

View file

View file

@ -0,0 +1,130 @@
from __future__ import absolute_import, unicode_literals
from functools import reduce
def get_bit_values(number, size=32):
"""
Get bit values as a list for a given number
>>> get_bit_values(1) == [0]*31 + [1]
True
>>> get_bit_values(0xDEADBEEF)
[1, 1, 0, 1, 1, 1, 1, 0, 1, 0, 1, 0, 1, 1, 0, 1, 1, 0, 1, 1, 1, 1, 1, 0, 1, 1, 1, 0, 1, 1, 1, 1]
You may override the default word size of 32-bits to match your actual
application.
>>> get_bit_values(0x3, 2)
[1, 1]
>>> get_bit_values(0x3, 4)
[0, 0, 1, 1]
"""
number += 2**size
return list(map(int, bin(number)[-size:]))
def gen_bit_values(number):
"""
Return a zero or one for each bit of a numeric value up to the most
significant 1 bit, beginning with the least significant bit.
>>> list(gen_bit_values(16))
[0, 0, 0, 0, 1]
"""
digits = bin(number)[2:]
return map(int, reversed(digits))
def coalesce(bits):
"""
Take a sequence of bits, most significant first, and
coalesce them into a number.
>>> coalesce([1,0,1])
5
"""
operation = lambda a, b: (a << 1 | b)
return reduce(operation, bits)
class Flags(object):
"""
Subclasses should define _names, a list of flag names beginning
with the least-significant bit.
>>> class MyFlags(Flags):
... _names = 'a', 'b', 'c'
>>> mf = MyFlags.from_number(5)
>>> mf['a']
1
>>> mf['b']
0
>>> mf['c'] == mf[2]
True
>>> mf['b'] = 1
>>> mf['a'] = 0
>>> mf.number
6
"""
def __init__(self, values):
self._values = list(values)
if hasattr(self, '_names'):
n_missing_bits = len(self._names) - len(self._values)
self._values.extend([0] * n_missing_bits)
@classmethod
def from_number(cls, number):
return cls(gen_bit_values(number))
@property
def number(self):
return coalesce(reversed(self._values))
def __setitem__(self, key, value):
# first try by index, then by name
try:
self._values[key] = value
except TypeError:
index = self._names.index(key)
self._values[index] = value
def __getitem__(self, key):
# first try by index, then by name
try:
return self._values[key]
except TypeError:
index = self._names.index(key)
return self._values[index]
class BitMask(type):
"""
A metaclass to create a bitmask with attributes. Subclass an int and
set this as the metaclass to use.
Here's how to create such a class on Python 3:
class MyBits(int, metaclass=BitMask):
a = 0x1
b = 0x4
c = 0x3
For testing purposes, construct explicitly to support Python 2
>>> ns = dict(a=0x1, b=0x4, c=0x3)
>>> MyBits = BitMask(str('MyBits'), (int,), ns)
>>> b1 = MyBits(3)
>>> b1.a, b1.b, b1.c
(True, False, True)
>>> b2 = MyBits(8)
>>> any([b2.a, b2.b, b2.c])
False
"""
def __new__(cls, name, bases, attrs):
newattrs = dict(
(attr, property(lambda self, value=value: bool(self & value)))
for attr, value in attrs.items()
if not attr.startswith('_')
)
return type.__new__(cls, name, bases, newattrs)

371
libs/jaraco/text.py Normal file
View file

@ -0,0 +1,371 @@
from __future__ import absolute_import, unicode_literals, print_function
import sys
import re
import inspect
import itertools
import textwrap
import functools
import six
import jaraco.collections
from jaraco.functools import compose
def substitution(old, new):
"""
Return a function that will perform a substitution on a string
"""
return lambda s: s.replace(old, new)
def multi_substitution(*substitutions):
"""
Take a sequence of pairs specifying substitutions, and create
a function that performs those substitutions.
>>> multi_substitution(('foo', 'bar'), ('bar', 'baz'))('foo')
'baz'
"""
substitutions = itertools.starmap(substitution, substitutions)
# compose function applies last function first, so reverse the
# substitutions to get the expected order.
substitutions = reversed(tuple(substitutions))
return compose(*substitutions)
class FoldedCase(six.text_type):
"""
A case insensitive string class; behaves just like str
except compares equal when the only variation is case.
>>> s = FoldedCase('hello world')
>>> s == 'Hello World'
True
>>> 'Hello World' == s
True
>>> s.index('O')
4
>>> s.split('O')
['hell', ' w', 'rld']
>>> sorted(map(FoldedCase, ['GAMMA', 'alpha', 'Beta']))
['alpha', 'Beta', 'GAMMA']
"""
def __lt__(self, other):
return self.lower() < other.lower()
def __gt__(self, other):
return self.lower() > other.lower()
def __eq__(self, other):
return self.lower() == other.lower()
def __hash__(self):
return hash(self.lower())
# cache lower since it's likely to be called frequently.
def lower(self):
self._lower = super(FoldedCase, self).lower()
self.lower = lambda: self._lower
return self._lower
def index(self, sub):
return self.lower().index(sub.lower())
def split(self, splitter=' ', maxsplit=0):
pattern = re.compile(re.escape(splitter), re.I)
return pattern.split(self, maxsplit)
def local_format(string):
"""
format the string using variables in the caller's local namespace.
>>> a = 3
>>> local_format("{a:5}")
' 3'
"""
context = inspect.currentframe().f_back.f_locals
if sys.version_info < (3, 2):
return string.format(**context)
return string.format_map(context)
def global_format(string):
"""
format the string using variables in the caller's global namespace.
>>> a = 3
>>> fmt = "The func name: {global_format.__name__}"
>>> global_format(fmt)
'The func name: global_format'
"""
context = inspect.currentframe().f_back.f_globals
if sys.version_info < (3, 2):
return string.format(**context)
return string.format_map(context)
def namespace_format(string):
"""
Format the string using variable in the caller's scope (locals + globals).
>>> a = 3
>>> fmt = "A is {a} and this func is {namespace_format.__name__}"
>>> namespace_format(fmt)
'A is 3 and this func is namespace_format'
"""
context = jaraco.collections.DictStack()
context.push(inspect.currentframe().f_back.f_globals)
context.push(inspect.currentframe().f_back.f_locals)
if sys.version_info < (3, 2):
return string.format(**context)
return string.format_map(context)
def is_decodable(value):
r"""
Return True if the supplied value is decodable (using the default
encoding).
>>> is_decodable(b'\xff')
False
>>> is_decodable(b'\x32')
True
"""
# TODO: This code could be expressed more consisely and directly
# with a jaraco.context.ExceptionTrap, but that adds an unfortunate
# long dependency tree, so for now, use boolean literals.
try:
value.decode()
except UnicodeDecodeError:
return False
return True
def is_binary(value):
"""
Return True if the value appears to be binary (that is, it's a byte
string and isn't decodable).
"""
return isinstance(value, bytes) and not is_decodable(value)
def trim(s):
r"""
Trim something like a docstring to remove the whitespace that
is common due to indentation and formatting.
>>> trim("\n\tfoo = bar\n\t\tbar = baz\n")
'foo = bar\n\tbar = baz'
"""
return textwrap.dedent(s).strip()
class Splitter(object):
"""object that will split a string with the given arguments for each call
>>> s = Splitter(',')
>>> s('hello, world, this is your, master calling')
['hello', ' world', ' this is your', ' master calling']
"""
def __init__(self, *args):
self.args = args
def __call__(self, s):
return s.split(*self.args)
def indent(string, prefix=' ' * 4):
return prefix + string
class WordSet(tuple):
"""
Given a Python identifier, return the words that identifier represents,
whether in camel case, underscore-separated, etc.
>>> WordSet.parse("camelCase")
('camel', 'Case')
>>> WordSet.parse("under_sep")
('under', 'sep')
Acronyms should be retained
>>> WordSet.parse("firstSNL")
('first', 'SNL')
>>> WordSet.parse("you_and_I")
('you', 'and', 'I')
>>> WordSet.parse("A simple test")
('A', 'simple', 'test')
Multiple caps should not interfere with the first cap of another word.
>>> WordSet.parse("myABCClass")
('my', 'ABC', 'Class')
The result is a WordSet, so you can get the form you need.
>>> WordSet.parse("myABCClass").underscore_separated()
'my_ABC_Class'
>>> WordSet.parse('a-command').camel_case()
'ACommand'
>>> WordSet.parse('someIdentifier').lowered().space_separated()
'some identifier'
Slices of the result should return another WordSet.
>>> WordSet.parse('taken-out-of-context')[1:].underscore_separated()
'out_of_context'
>>> WordSet.from_class_name(WordSet()).lowered().space_separated()
'word set'
"""
_pattern = re.compile('([A-Z]?[a-z]+)|([A-Z]+(?![a-z]))')
def capitalized(self):
return WordSet(word.capitalize() for word in self)
def lowered(self):
return WordSet(word.lower() for word in self)
def camel_case(self):
return ''.join(self.capitalized())
def headless_camel_case(self):
words = iter(self)
first = next(words).lower()
return itertools.chain((first,), WordSet(words).camel_case())
def underscore_separated(self):
return '_'.join(self)
def dash_separated(self):
return '-'.join(self)
def space_separated(self):
return ' '.join(self)
def __getitem__(self, item):
result = super(WordSet, self).__getitem__(item)
if isinstance(item, slice):
result = WordSet(result)
return result
# for compatibility with Python 2
def __getslice__(self, i, j):
return self.__getitem__(slice(i, j))
@classmethod
def parse(cls, identifier):
matches = cls._pattern.finditer(identifier)
return WordSet(match.group(0) for match in matches)
@classmethod
def from_class_name(cls, subject):
return cls.parse(subject.__class__.__name__)
# for backward compatibility
words = WordSet.parse
def simple_html_strip(s):
r"""
Remove HTML from the string `s`.
>>> str(simple_html_strip(''))
''
>>> print(simple_html_strip('A <bold>stormy</bold> day in paradise'))
A stormy day in paradise
>>> print(simple_html_strip('Somebody <!-- do not --> tell the truth.'))
Somebody tell the truth.
>>> print(simple_html_strip('What about<br/>\nmultiple lines?'))
What about
multiple lines?
"""
html_stripper = re.compile('(<!--.*?-->)|(<[^>]*>)|([^<]+)', re.DOTALL)
texts = (
match.group(3) or ''
for match
in html_stripper.finditer(s)
)
return ''.join(texts)
class SeparatedValues(six.text_type):
"""
A string separated by a separator. Overrides __iter__ for getting
the values.
>>> list(SeparatedValues('a,b,c'))
['a', 'b', 'c']
Whitespace is stripped and empty values are discarded.
>>> list(SeparatedValues(' a, b , c, '))
['a', 'b', 'c']
"""
separator = ','
def __iter__(self):
parts = self.split(self.separator)
return six.moves.filter(None, (part.strip() for part in parts))
class Stripper:
r"""
Given a series of lines, find the common prefix and strip it from them.
>>> lines = [
... 'abcdefg\n',
... 'abc\n',
... 'abcde\n',
... ]
>>> res = Stripper.strip_prefix(lines)
>>> res.prefix
'abc'
>>> list(res.lines)
['defg\n', '\n', 'de\n']
If no prefix is common, nothing should be stripped.
>>> lines = [
... 'abcd\n',
... '1234\n',
... ]
>>> res = Stripper.strip_prefix(lines)
>>> res.prefix = ''
>>> list(res.lines)
['abcd\n', '1234\n']
"""
def __init__(self, prefix, lines):
self.prefix = prefix
self.lines = map(self, lines)
@classmethod
def strip_prefix(cls, lines):
prefix_lines, lines = itertools.tee(lines)
prefix = functools.reduce(cls.common_prefix, prefix_lines)
return cls(prefix, lines)
def __call__(self, line):
if not self.prefix:
return line
null, prefix, rest = line.partition(self.prefix)
return rest
@staticmethod
def common_prefix(s1, s2):
"""
Return the common prefix of two lines.
"""
index = min(len(s1), len(s2))
while s1[:index] != s2[:index]:
index -= 1
return s1[:index]

View file

62
libs/jaraco/ui/cmdline.py Normal file
View file

@ -0,0 +1,62 @@
import argparse
import six
from jaraco.classes import meta
from jaraco import text
@six.add_metaclass(meta.LeafClassesMeta)
class Command(object):
"""
A general-purpose base class for creating commands for a command-line
program using argparse. Each subclass of Command represents a separate
sub-command of a program.
For example, one might use Command subclasses to implement the Mercurial
command set::
class Commit(Command):
@staticmethod
def add_arguments(cls, parser):
parser.add_argument('-m', '--message')
@classmethod
def run(cls, args):
"Run the 'commit' command with args (parsed)"
class Merge(Command): pass
class Pull(Command): pass
...
Then one could create an entry point for Mercurial like so::
def hg_command():
Command.invoke()
"""
@classmethod
def add_subparsers(cls, parser):
subparsers = parser.add_subparsers()
[cmd_class.add_parser(subparsers) for cmd_class in cls._leaf_classes]
@classmethod
def add_parser(cls, subparsers):
cmd_string = text.words(cls.__name__).lowered().dash_separated()
parser = subparsers.add_parser(cmd_string)
parser.set_defaults(action=cls)
cls.add_arguments(parser)
return parser
@classmethod
def add_arguments(cls, parser):
pass
@classmethod
def invoke(cls):
"""
Invoke the command using ArgumentParser
"""
parser = argparse.ArgumentParser()
cls.add_subparsers(parser)
args = parser.parse_args()
args.action.run(args)

108
libs/jaraco/ui/editor.py Normal file
View file

@ -0,0 +1,108 @@
from __future__ import unicode_literals, absolute_import
import tempfile
import os
import sys
import subprocess
import mimetypes
import collections
import io
import difflib
import six
class EditProcessException(RuntimeError): pass
class EditableFile(object):
"""
EditableFile saves some data to a temporary file, launches a
platform editor for interactive editing, and then reloads the data,
setting .changed to True if the data was edited.
e.g.::
x = EditableFile('foo')
x.edit()
if x.changed:
print(x.data)
The EDITOR environment variable can define which executable to use
(also XML_EDITOR if the content-type to edit includes 'xml'). If no
EDITOR is defined, defaults to 'notepad' on Windows and 'edit' on
other platforms.
"""
platform_default_editors = collections.defaultdict(
lambda: 'edit',
win32 = 'notepad',
linux2 = 'vi',
)
encoding = 'utf-8'
def __init__(self, data='', content_type='text/plain'):
self.data = six.text_type(data)
self.content_type = content_type
def __enter__(self):
extension = mimetypes.guess_extension(self.content_type) or ''
fobj, self.name = tempfile.mkstemp(extension)
os.write(fobj, self.data.encode(self.encoding))
os.close(fobj)
return self
def read(self):
with open(self.name, 'rb') as f:
return f.read().decode(self.encoding)
def __exit__(self, *tb_info):
os.remove(self.name)
def edit(self):
"""
Edit the file
"""
self.changed = False
with self:
editor = self.get_editor()
cmd = [editor, self.name]
try:
res = subprocess.call(cmd)
except Exception as e:
print("Error launching editor %(editor)s" % locals())
print(e)
return
if res != 0:
msg = '%(editor)s returned error status %(res)d' % locals()
raise EditProcessException(msg)
new_data = self.read()
if new_data != self.data:
self.changed = self._save_diff(self.data, new_data)
self.data = new_data
@staticmethod
def _search_env(keys):
"""
Search the environment for the supplied keys, returning the first
one found or None if none was found.
"""
matches = (os.environ[key] for key in keys if key in os.environ)
return next(matches, None)
def get_editor(self):
"""
Give preference to an XML_EDITOR or EDITOR defined in the
environment. Otherwise use a default editor based on platform.
"""
env_search = ['EDITOR']
if 'xml' in self.content_type:
env_search.insert(0, 'XML_EDITOR')
default_editor = self.platform_default_editors[sys.platform]
return self._search_env(env_search) or default_editor
@staticmethod
def _save_diff(*versions):
def get_lines(content):
return list(io.StringIO(content))
lines = map(get_lines, versions)
diff = difflib.context_diff(*lines)
return tuple(diff)

26
libs/jaraco/ui/input.py Normal file
View file

@ -0,0 +1,26 @@
"""
This module currently provides a cross-platform getch function
"""
try:
# Windows
from msvcrt import getch
except ImportError:
pass
try:
# Unix
import sys
import tty
import termios
def getch():
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
try:
tty.setraw(fd)
return sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
except ImportError:
pass

34
libs/jaraco/ui/menu.py Normal file
View file

@ -0,0 +1,34 @@
from __future__ import print_function, absolute_import, unicode_literals
import itertools
import six
class Menu(object):
"""
A simple command-line based menu
"""
def __init__(self, choices=None, formatter=str):
self.choices = choices or list()
self.formatter = formatter
def get_choice(self, prompt="> "):
n = len(self.choices)
number_width = len(str(n)) + 1
menu_fmt = '{number:{number_width}}) {choice}'
formatted_choices = map(self.formatter, self.choices)
for number, choice in zip(itertools.count(1), formatted_choices):
print(menu_fmt.format(**locals()))
print()
try:
answer = int(six.moves.input(prompt))
result = self.choices[answer - 1]
except ValueError:
print('invalid selection')
result = None
except IndexError:
print('invalid selection')
result = None
except KeyboardInterrupt:
result = None
return result

150
libs/jaraco/ui/progress.py Normal file
View file

@ -0,0 +1,150 @@
from __future__ import (print_function, absolute_import, unicode_literals,
division)
import time
import sys
import itertools
import abc
import datetime
import six
@six.add_metaclass(abc.ABCMeta)
class AbstractProgressBar(object):
def __init__(self, unit='', size=70):
"""
Size is the nominal size in characters
"""
self.unit = unit
self.size = size
def report(self, amt):
sys.stdout.write('\r%s' % self.get_bar(amt))
sys.stdout.flush()
@abc.abstractmethod
def get_bar(self, amt):
"Return the string to be printed. Should be size >= self.size"
def summary(self, str):
return ' (' + self.unit_str(str) + ')'
def unit_str(self, str):
if self.unit:
str += ' ' + self.unit
return str
def finish(self):
print()
def __enter__(self):
self.report(0)
return self
def __exit__(self, exc, exc_val, tb):
if exc is None:
self.finish()
else:
print()
def iterate(self, iterable):
"""
Report the status as the iterable is consumed.
"""
with self:
for n, item in enumerate(iterable, 1):
self.report(n)
yield item
class SimpleProgressBar(AbstractProgressBar):
_PROG_DISPGLYPH = itertools.cycle(['|', '/', '-', '\\'])
def get_bar(self, amt):
bar = next(self._PROG_DISPGLYPH)
template = ' [{bar:^{bar_len}}]'
summary = self.summary('{amt}')
template += summary
empty = template.format(
bar='',
bar_len=0,
amt=amt,
)
bar_len = self.size - len(empty)
return template.format(**locals())
@classmethod
def demo(cls):
bar3 = cls(unit='cubes', size=30)
with bar3:
for x in six.moves.range(1, 759):
bar3.report(x)
time.sleep(0.01)
class TargetProgressBar(AbstractProgressBar):
def __init__(self, total=None, unit='', size=70):
"""
Size is the nominal size in characters
"""
self.total = total
super(TargetProgressBar, self).__init__(unit, size)
def get_bar(self, amt):
template = ' [{bar:<{bar_len}}]'
completed = amt / self.total
percent = int(completed * 100)
percent_str = ' {percent:3}%'
template += percent_str
summary = self.summary('{amt}/{total}')
template += summary
empty = template.format(
total=self.total,
bar='',
bar_len=0,
**locals()
)
bar_len = self.size - len(empty)
bar = '=' * int(completed * bar_len)
return template.format(total=self.total, **locals())
@classmethod
def demo(cls):
bar1 = cls(100, 'blocks')
with bar1:
for x in six.moves.range(1, 101):
bar1.report(x)
time.sleep(0.05)
bar2 = cls(758, size=50)
with bar2:
for x in six.moves.range(1, 759):
bar2.report(x)
time.sleep(0.01)
def finish(self):
self.report(self.total)
super(TargetProgressBar, self).finish()
def countdown(template, duration=datetime.timedelta(seconds=5)):
"""
Do a countdown for duration, printing the template (which may accept one
positional argument). Template should be something like
``countdown complete in {} seconds.``
"""
now = datetime.datetime.now()
deadline = now + duration
remaining = deadline - datetime.datetime.now()
while remaining:
remaining = deadline - datetime.datetime.now()
remaining = max(datetime.timedelta(), remaining)
msg = template.format(remaining.total_seconds())
print(msg, end=' '*10)
sys.stdout.flush()
time.sleep(.1)
print('\b'*80, end='')
sys.stdout.flush()
print()

View file

@ -0,0 +1,2 @@
from more_itertools.more import *
from more_itertools.recipes import *

237
libs/more_itertools/more.py Normal file
View file

@ -0,0 +1,237 @@
from functools import partial, wraps
from itertools import izip_longest
from recipes import *
__all__ = ['chunked', 'first', 'peekable', 'collate', 'consumer', 'ilen',
'iterate', 'with_iter']
_marker = object()
def chunked(iterable, n):
"""Break an iterable into lists of a given length::
>>> list(chunked([1, 2, 3, 4, 5, 6, 7], 3))
[[1, 2, 3], [4, 5, 6], [7]]
If the length of ``iterable`` is not evenly divisible by ``n``, the last
returned list will be shorter.
This is useful for splitting up a computation on a large number of keys
into batches, to be pickled and sent off to worker processes. One example
is operations on rows in MySQL, which does not implement server-side
cursors properly and would otherwise load the entire dataset into RAM on
the client.
"""
# Doesn't seem to run into any number-of-args limits.
for group in (list(g) for g in izip_longest(*[iter(iterable)] * n,
fillvalue=_marker)):
if group[-1] is _marker:
# If this is the last group, shuck off the padding:
del group[group.index(_marker):]
yield group
def first(iterable, default=_marker):
"""Return the first item of an iterable, ``default`` if there is none.
>>> first(xrange(4))
0
>>> first(xrange(0), 'some default')
'some default'
If ``default`` is not provided and there are no items in the iterable,
raise ``ValueError``.
``first()`` is useful when you have a generator of expensive-to-retrieve
values and want any arbitrary one. It is marginally shorter than
``next(iter(...))`` but saves you an entire ``try``/``except`` when you
want to provide a fallback value.
"""
try:
return next(iter(iterable))
except StopIteration:
# I'm on the edge about raising ValueError instead of StopIteration. At
# the moment, ValueError wins, because the caller could conceivably
# want to do something different with flow control when I raise the
# exception, and it's weird to explicitly catch StopIteration.
if default is _marker:
raise ValueError('first() was called on an empty iterable, and no '
'default value was provided.')
return default
class peekable(object):
"""Wrapper for an iterator to allow 1-item lookahead
Call ``peek()`` on the result to get the value that will next pop out of
``next()``, without advancing the iterator:
>>> p = peekable(xrange(2))
>>> p.peek()
0
>>> p.next()
0
>>> p.peek()
1
>>> p.next()
1
Pass ``peek()`` a default value, and it will be returned in the case where
the iterator is exhausted:
>>> p = peekable([])
>>> p.peek('hi')
'hi'
If no default is provided, ``peek()`` raises ``StopIteration`` when there
are no items left.
To test whether there are more items in the iterator, examine the
peekable's truth value. If it is truthy, there are more items.
>>> assert peekable(xrange(1))
>>> assert not peekable([])
"""
# Lowercase to blend in with itertools. The fact that it's a class is an
# implementation detail.
def __init__(self, iterable):
self._it = iter(iterable)
def __iter__(self):
return self
def __nonzero__(self):
try:
self.peek()
except StopIteration:
return False
return True
def peek(self, default=_marker):
"""Return the item that will be next returned from ``next()``.
Return ``default`` if there are no items left. If ``default`` is not
provided, raise ``StopIteration``.
"""
if not hasattr(self, '_peek'):
try:
self._peek = self._it.next()
except StopIteration:
if default is _marker:
raise
return default
return self._peek
def next(self):
ret = self.peek()
del self._peek
return ret
def collate(*iterables, **kwargs):
"""Return a sorted merge of the items from each of several already-sorted
``iterables``.
>>> list(collate('ACDZ', 'AZ', 'JKL'))
['A', 'A', 'C', 'D', 'J', 'K', 'L', 'Z', 'Z']
Works lazily, keeping only the next value from each iterable in memory. Use
``collate()`` to, for example, perform a n-way mergesort of items that
don't fit in memory.
:arg key: A function that returns a comparison value for an item. Defaults
to the identity function.
:arg reverse: If ``reverse=True``, yield results in descending order
rather than ascending. ``iterables`` must also yield their elements in
descending order.
If the elements of the passed-in iterables are out of order, you might get
unexpected results.
"""
key = kwargs.pop('key', lambda a: a)
reverse = kwargs.pop('reverse', False)
min_or_max = partial(max if reverse else min, key=lambda (a, b): a)
peekables = [peekable(it) for it in iterables]
peekables = [p for p in peekables if p] # Kill empties.
while peekables:
_, p = min_or_max((key(p.peek()), p) for p in peekables)
yield p.next()
peekables = [p for p in peekables if p]
def consumer(func):
"""Decorator that automatically advances a PEP-342-style "reverse iterator"
to its first yield point so you don't have to call ``next()`` on it
manually.
>>> @consumer
... def tally():
... i = 0
... while True:
... print 'Thing number %s is %s.' % (i, (yield))
... i += 1
...
>>> t = tally()
>>> t.send('red')
Thing number 0 is red.
>>> t.send('fish')
Thing number 1 is fish.
Without the decorator, you would have to call ``t.next()`` before
``t.send()`` could be used.
"""
@wraps(func)
def wrapper(*args, **kwargs):
gen = func(*args, **kwargs)
gen.next()
return gen
return wrapper
def ilen(iterable):
"""Return the number of items in ``iterable``.
>>> from itertools import ifilter
>>> ilen(ifilter(lambda x: x % 3 == 0, xrange(1000000)))
333334
This does, of course, consume the iterable, so handle it with care.
"""
return sum(1 for _ in iterable)
def iterate(func, start):
"""Return ``start``, ``func(start)``, ``func(func(start))``, ...
>>> from itertools import islice
>>> list(islice(iterate(lambda x: 2*x, 1), 10))
[1, 2, 4, 8, 16, 32, 64, 128, 256, 512]
"""
while True:
yield start
start = func(start)
def with_iter(context_manager):
"""Wrap an iterable in a ``with`` statement, so it closes once exhausted.
Example::
upper_lines = (line.upper() for line in with_iter(open('foo')))
"""
with context_manager as iterable:
for item in iterable:
yield item

View file

@ -0,0 +1,331 @@
"""Imported from the recipes section of the itertools documentation.
All functions taken from the recipes section of the itertools library docs
[1]_.
Some backward-compatible usability improvements have been made.
.. [1] http://docs.python.org/library/itertools.html#recipes
"""
from collections import deque
from itertools import chain, combinations, count, cycle, groupby, ifilterfalse, imap, islice, izip, izip_longest, repeat, starmap, tee # Wrapping breaks 2to3.
import operator
from random import randrange, sample, choice
__all__ = ['take', 'tabulate', 'consume', 'nth', 'quantify', 'padnone',
'ncycles', 'dotproduct', 'flatten', 'repeatfunc', 'pairwise',
'grouper', 'roundrobin', 'powerset', 'unique_everseen',
'unique_justseen', 'iter_except', 'random_product',
'random_permutation', 'random_combination',
'random_combination_with_replacement']
def take(n, iterable):
"""Return first n items of the iterable as a list
>>> take(3, range(10))
[0, 1, 2]
>>> take(5, range(3))
[0, 1, 2]
Effectively a short replacement for ``next`` based iterator consumption
when you want more than one item, but less than the whole iterator.
"""
return list(islice(iterable, n))
def tabulate(function, start=0):
"""Return an iterator mapping the function over linear input.
The start argument will be increased by 1 each time the iterator is called
and fed into the function.
>>> t = tabulate(lambda x: x**2, -3)
>>> take(3, t)
[9, 4, 1]
"""
return imap(function, count(start))
def consume(iterator, n=None):
"""Advance the iterator n-steps ahead. If n is none, consume entirely.
Efficiently exhausts an iterator without returning values. Defaults to
consuming the whole iterator, but an optional second argument may be
provided to limit consumption.
>>> i = (x for x in range(10))
>>> next(i)
0
>>> consume(i, 3)
>>> next(i)
4
>>> consume(i)
>>> next(i)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
If the iterator has fewer items remaining than the provided limit, the
whole iterator will be consumed.
>>> i = (x for x in range(3))
>>> consume(i, 5)
>>> next(i)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
StopIteration
"""
# Use functions that consume iterators at C speed.
if n is None:
# feed the entire iterator into a zero-length deque
deque(iterator, maxlen=0)
else:
# advance to the empty slice starting at position n
next(islice(iterator, n, n), None)
def nth(iterable, n, default=None):
"""Returns the nth item or a default value
>>> l = range(10)
>>> nth(l, 3)
3
>>> nth(l, 20, "zebra")
'zebra'
"""
return next(islice(iterable, n, None), default)
def quantify(iterable, pred=bool):
"""Return the how many times the predicate is true
>>> quantify([True, False, True])
2
"""
return sum(imap(pred, iterable))
def padnone(iterable):
"""Returns the sequence of elements and then returns None indefinitely.
>>> take(5, padnone(range(3)))
[0, 1, 2, None, None]
Useful for emulating the behavior of the built-in map() function.
"""
return chain(iterable, repeat(None))
def ncycles(iterable, n):
"""Returns the sequence elements n times
>>> list(ncycles(["a", "b"], 3))
['a', 'b', 'a', 'b', 'a', 'b']
"""
return chain.from_iterable(repeat(tuple(iterable), n))
def dotproduct(vec1, vec2):
"""Returns the dot product of the two iterables
>>> dotproduct([10, 10], [20, 20])
400
"""
return sum(imap(operator.mul, vec1, vec2))
def flatten(listOfLists):
"""Return an iterator flattening one level of nesting in a list of lists
>>> list(flatten([[0, 1], [2, 3]]))
[0, 1, 2, 3]
"""
return chain.from_iterable(listOfLists)
def repeatfunc(func, times=None, *args):
"""Repeat calls to func with specified arguments.
>>> list(repeatfunc(lambda: 5, 3))
[5, 5, 5]
>>> list(repeatfunc(lambda x: x ** 2, 3, 3))
[9, 9, 9]
"""
if times is None:
return starmap(func, repeat(args))
return starmap(func, repeat(args, times))
def pairwise(iterable):
"""Returns an iterator of paired items, overlapping, from the original
>>> take(4, pairwise(count()))
[(0, 1), (1, 2), (2, 3), (3, 4)]
"""
a, b = tee(iterable)
next(b, None)
return izip(a, b)
def grouper(n, iterable, fillvalue=None):
"""Collect data into fixed-length chunks or blocks
>>> list(grouper(3, 'ABCDEFG', 'x'))
[('A', 'B', 'C'), ('D', 'E', 'F'), ('G', 'x', 'x')]
"""
args = [iter(iterable)] * n
return izip_longest(fillvalue=fillvalue, *args)
def roundrobin(*iterables):
"""Yields an item from each iterable, alternating between them
>>> list(roundrobin('ABC', 'D', 'EF'))
['A', 'D', 'E', 'B', 'F', 'C']
"""
# Recipe credited to George Sakkis
pending = len(iterables)
nexts = cycle(iter(it).next for it in iterables)
while pending:
try:
for next in nexts:
yield next()
except StopIteration:
pending -= 1
nexts = cycle(islice(nexts, pending))
def powerset(iterable):
"""Yields all possible subsets of the iterable
>>> list(powerset([1,2,3]))
[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)]
"""
s = list(iterable)
return chain.from_iterable(combinations(s, r) for r in range(len(s)+1))
def unique_everseen(iterable, key=None):
"""Yield unique elements, preserving order.
>>> list(unique_everseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D']
>>> list(unique_everseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'D']
"""
seen = set()
seen_add = seen.add
if key is None:
for element in ifilterfalse(seen.__contains__, iterable):
seen_add(element)
yield element
else:
for element in iterable:
k = key(element)
if k not in seen:
seen_add(k)
yield element
def unique_justseen(iterable, key=None):
"""Yields elements in order, ignoring serial duplicates
>>> list(unique_justseen('AAAABBBCCDAABBB'))
['A', 'B', 'C', 'D', 'A', 'B']
>>> list(unique_justseen('ABBCcAD', str.lower))
['A', 'B', 'C', 'A', 'D']
"""
return imap(next, imap(operator.itemgetter(1), groupby(iterable, key)))
def iter_except(func, exception, first=None):
"""Yields results from a function repeatedly until an exception is raised.
Converts a call-until-exception interface to an iterator interface.
Like __builtin__.iter(func, sentinel) but uses an exception instead
of a sentinel to end the loop.
>>> l = range(3)
>>> list(iter_except(l.pop, IndexError))
[2, 1, 0]
"""
try:
if first is not None:
yield first()
while 1:
yield func()
except exception:
pass
def random_product(*args, **kwds):
"""Returns a random pairing of items from each iterable argument
If `repeat` is provided as a kwarg, it's value will be used to indicate
how many pairings should be chosen.
>>> random_product(['a', 'b', 'c'], [1, 2], repeat=2) # doctest:+SKIP
('b', '2', 'c', '2')
"""
pools = map(tuple, args) * kwds.get('repeat', 1)
return tuple(choice(pool) for pool in pools)
def random_permutation(iterable, r=None):
"""Returns a random permutation.
If r is provided, the permutation is truncated to length r.
>>> random_permutation(range(5)) # doctest:+SKIP
(3, 4, 0, 1, 2)
"""
pool = tuple(iterable)
r = len(pool) if r is None else r
return tuple(sample(pool, r))
def random_combination(iterable, r):
"""Returns a random combination of length r, chosen without replacement.
>>> random_combination(range(5), 3) # doctest:+SKIP
(2, 3, 4)
"""
pool = tuple(iterable)
n = len(pool)
indices = sorted(sample(xrange(n), r))
return tuple(pool[i] for i in indices)
def random_combination_with_replacement(iterable, r):
"""Returns a random combination of length r, chosen with replacement.
>>> random_combination_with_replacement(range(3), 5) # # doctest:+SKIP
(0, 0, 1, 2, 2)
"""
pool = tuple(iterable)
n = len(pool)
indices = sorted(randrange(n) for i in xrange(r))
return tuple(pool[i] for i in indices)

View file

View file

@ -0,0 +1,143 @@
from contextlib import closing
from itertools import islice, ifilter
from StringIO import StringIO
from unittest import TestCase
from nose.tools import eq_, assert_raises
from more_itertools import * # Test all the symbols are in __all__.
class CollateTests(TestCase):
"""Unit tests for ``collate()``"""
# Also accidentally tests peekable, though that could use its own tests
def test_default(self):
"""Test with the default `key` function."""
iterables = [xrange(4), xrange(7), xrange(3, 6)]
eq_(sorted(reduce(list.__add__, [list(it) for it in iterables])),
list(collate(*iterables)))
def test_key(self):
"""Test using a custom `key` function."""
iterables = [xrange(5, 0, -1), xrange(4, 0, -1)]
eq_(list(sorted(reduce(list.__add__,
[list(it) for it in iterables]),
reverse=True)),
list(collate(*iterables, key=lambda x: -x)))
def test_empty(self):
"""Be nice if passed an empty list of iterables."""
eq_([], list(collate()))
def test_one(self):
"""Work when only 1 iterable is passed."""
eq_([0, 1], list(collate(xrange(2))))
def test_reverse(self):
"""Test the `reverse` kwarg."""
iterables = [xrange(4, 0, -1), xrange(7, 0, -1), xrange(3, 6, -1)]
eq_(sorted(reduce(list.__add__, [list(it) for it in iterables]),
reverse=True),
list(collate(*iterables, reverse=True)))
class ChunkedTests(TestCase):
"""Tests for ``chunked()``"""
def test_even(self):
"""Test when ``n`` divides evenly into the length of the iterable."""
eq_(list(chunked('ABCDEF', 3)), [['A', 'B', 'C'], ['D', 'E', 'F']])
def test_odd(self):
"""Test when ``n`` does not divide evenly into the length of the
iterable.
"""
eq_(list(chunked('ABCDE', 3)), [['A', 'B', 'C'], ['D', 'E']])
class FirstTests(TestCase):
"""Tests for ``first()``"""
def test_many(self):
"""Test that it works on many-item iterables."""
# Also try it on a generator expression to make sure it works on
# whatever those return, across Python versions.
eq_(first(x for x in xrange(4)), 0)
def test_one(self):
"""Test that it doesn't raise StopIteration prematurely."""
eq_(first([3]), 3)
def test_empty_stop_iteration(self):
"""It should raise StopIteration for empty iterables."""
assert_raises(ValueError, first, [])
def test_default(self):
"""It should return the provided default arg for empty iterables."""
eq_(first([], 'boo'), 'boo')
class PeekableTests(TestCase):
"""Tests for ``peekable()`` behavor not incidentally covered by testing
``collate()``
"""
def test_peek_default(self):
"""Make sure passing a default into ``peek()`` works."""
p = peekable([])
eq_(p.peek(7), 7)
def test_truthiness(self):
"""Make sure a ``peekable`` tests true iff there are items remaining in
the iterable.
"""
p = peekable([])
self.failIf(p)
p = peekable(xrange(3))
self.failUnless(p)
def test_simple_peeking(self):
"""Make sure ``next`` and ``peek`` advance and don't advance the
iterator, respectively.
"""
p = peekable(xrange(10))
eq_(p.next(), 0)
eq_(p.peek(), 1)
eq_(p.next(), 1)
class ConsumerTests(TestCase):
"""Tests for ``consumer()``"""
def test_consumer(self):
@consumer
def eater():
while True:
x = yield
e = eater()
e.send('hi') # without @consumer, would raise TypeError
def test_ilen():
"""Sanity-check ``ilen()``."""
eq_(ilen(ifilter(lambda x: x % 10 == 0, range(101))), 11)
def test_with_iter():
"""Make sure ``with_iter`` iterates over and closes things correctly."""
s = StringIO('One fish\nTwo fish')
initial_words = [line.split()[0] for line in with_iter(closing(s))]
eq_(initial_words, ['One', 'Two'])
# Make sure closing happened:
try:
list(s)
except ValueError: # "I/O operation on closed file"
pass
else:
raise AssertionError('StringIO object was not closed.')

View file

@ -0,0 +1,433 @@
from random import seed
from unittest import TestCase
from nose.tools import eq_, assert_raises, ok_
from more_itertools import *
def setup_module():
seed(1337)
class TakeTests(TestCase):
"""Tests for ``take()``"""
def test_simple_take(self):
"""Test basic usage"""
t = take(5, xrange(10))
eq_(t, [0, 1, 2, 3, 4])
def test_null_take(self):
"""Check the null case"""
t = take(0, xrange(10))
eq_(t, [])
def test_negative_take(self):
"""Make sure taking negative items results in a ValueError"""
assert_raises(ValueError, take, -3, xrange(10))
def test_take_too_much(self):
"""Taking more than an iterator has remaining should return what the
iterator has remaining.
"""
t = take(10, xrange(5))
eq_(t, [0, 1, 2, 3, 4])
class TabulateTests(TestCase):
"""Tests for ``tabulate()``"""
def test_simple_tabulate(self):
"""Test the happy path"""
t = tabulate(lambda x: x)
f = tuple([next(t) for _ in range(3)])
eq_(f, (0, 1, 2))
def test_count(self):
"""Ensure tabulate accepts specific count"""
t = tabulate(lambda x: 2 * x, -1)
f = (next(t), next(t), next(t))
eq_(f, (-2, 0, 2))
class ConsumeTests(TestCase):
"""Tests for ``consume()``"""
def test_sanity(self):
"""Test basic functionality"""
r = (x for x in range(10))
consume(r, 3)
eq_(3, next(r))
def test_null_consume(self):
"""Check the null case"""
r = (x for x in range(10))
consume(r, 0)
eq_(0, next(r))
def test_negative_consume(self):
"""Check that negative consumsion throws an error"""
r = (x for x in range(10))
assert_raises(ValueError, consume, r, -1)
def test_total_consume(self):
"""Check that iterator is totally consumed by default"""
r = (x for x in range(10))
consume(r)
assert_raises(StopIteration, next, r)
class NthTests(TestCase):
"""Tests for ``nth()``"""
def test_basic(self):
"""Make sure the nth item is returned"""
l = range(10)
for i, v in enumerate(l):
eq_(nth(l, i), v)
def test_default(self):
"""Ensure a default value is returned when nth item not found"""
l = range(3)
eq_(nth(l, 100, "zebra"), "zebra")
def test_negative_item_raises(self):
"""Ensure asking for a negative item raises an exception"""
assert_raises(ValueError, nth, range(10), -3)
class QuantifyTests(TestCase):
"""Tests for ``quantify()``"""
def test_happy_path(self):
"""Make sure True count is returned"""
q = [True, False, True]
eq_(quantify(q), 2)
def test_custom_predicate(self):
"""Ensure non-default predicates return as expected"""
q = range(10)
eq_(quantify(q, lambda x: x % 2 == 0), 5)
class PadnoneTests(TestCase):
"""Tests for ``padnone()``"""
def test_happy_path(self):
"""wrapper iterator should return None indefinitely"""
r = range(2)
p = padnone(r)
eq_([0, 1, None, None], [next(p) for _ in range(4)])
class NcyclesTests(TestCase):
"""Tests for ``nyclces()``"""
def test_happy_path(self):
"""cycle a sequence three times"""
r = ["a", "b", "c"]
n = ncycles(r, 3)
eq_(["a", "b", "c", "a", "b", "c", "a", "b", "c"],
list(n))
def test_null_case(self):
"""asking for 0 cycles should return an empty iterator"""
n = ncycles(range(100), 0)
assert_raises(StopIteration, next, n)
def test_pathalogical_case(self):
"""asking for negative cycles should return an empty iterator"""
n = ncycles(range(100), -10)
assert_raises(StopIteration, next, n)
class DotproductTests(TestCase):
"""Tests for ``dotproduct()``'"""
def test_happy_path(self):
"""simple dotproduct example"""
eq_(400, dotproduct([10, 10], [20, 20]))
class FlattenTests(TestCase):
"""Tests for ``flatten()``"""
def test_basic_usage(self):
"""ensure list of lists is flattened one level"""
f = [[0, 1, 2], [3, 4, 5]]
eq_(range(6), list(flatten(f)))
def test_single_level(self):
"""ensure list of lists is flattened only one level"""
f = [[0, [1, 2]], [[3, 4], 5]]
eq_([0, [1, 2], [3, 4], 5], list(flatten(f)))
class RepeatfuncTests(TestCase):
"""Tests for ``repeatfunc()``"""
def test_simple_repeat(self):
"""test simple repeated functions"""
r = repeatfunc(lambda: 5)
eq_([5, 5, 5, 5, 5], [next(r) for _ in range(5)])
def test_finite_repeat(self):
"""ensure limited repeat when times is provided"""
r = repeatfunc(lambda: 5, times=5)
eq_([5, 5, 5, 5, 5], list(r))
def test_added_arguments(self):
"""ensure arguments are applied to the function"""
r = repeatfunc(lambda x: x, 2, 3)
eq_([3, 3], list(r))
def test_null_times(self):
"""repeat 0 should return an empty iterator"""
r = repeatfunc(range, 0, 3)
assert_raises(StopIteration, next, r)
class PairwiseTests(TestCase):
"""Tests for ``pairwise()``"""
def test_base_case(self):
"""ensure an iterable will return pairwise"""
p = pairwise([1, 2, 3])
eq_([(1, 2), (2, 3)], list(p))
def test_short_case(self):
"""ensure an empty iterator if there's not enough values to pair"""
p = pairwise("a")
assert_raises(StopIteration, next, p)
class GrouperTests(TestCase):
"""Tests for ``grouper()``"""
def test_even(self):
"""Test when group size divides evenly into the length of
the iterable.
"""
eq_(list(grouper(3, 'ABCDEF')), [('A', 'B', 'C'), ('D', 'E', 'F')])
def test_odd(self):
"""Test when group size does not divide evenly into the length of the
iterable.
"""
eq_(list(grouper(3, 'ABCDE')), [('A', 'B', 'C'), ('D', 'E', None)])
def test_fill_value(self):
"""Test that the fill value is used to pad the final group"""
eq_(list(grouper(3, 'ABCDE', 'x')), [('A', 'B', 'C'), ('D', 'E', 'x')])
class RoundrobinTests(TestCase):
"""Tests for ``roundrobin()``"""
def test_even_groups(self):
"""Ensure ordered output from evenly populated iterables"""
eq_(list(roundrobin('ABC', [1, 2, 3], range(3))),
['A', 1, 0, 'B', 2, 1, 'C', 3, 2])
def test_uneven_groups(self):
"""Ensure ordered output from unevenly populated iterables"""
eq_(list(roundrobin('ABCD', [1, 2], range(0))),
['A', 1, 'B', 2, 'C', 'D'])
class PowersetTests(TestCase):
"""Tests for ``powerset()``"""
def test_combinatorics(self):
"""Ensure a proper enumeration"""
p = powerset([1, 2, 3])
eq_(list(p),
[(), (1,), (2,), (3,), (1, 2), (1, 3), (2, 3), (1, 2, 3)])
class UniqueEverseenTests(TestCase):
"""Tests for ``unique_everseen()``"""
def test_everseen(self):
"""ensure duplicate elements are ignored"""
u = unique_everseen('AAAABBBBCCDAABBB')
eq_(['A', 'B', 'C', 'D'],
list(u))
def test_custom_key(self):
"""ensure the custom key comparison works"""
u = unique_everseen('aAbACCc', key=str.lower)
eq_(list('abC'), list(u))
class UniqueJustseenTests(TestCase):
"""Tests for ``unique_justseen()``"""
def test_justseen(self):
"""ensure only last item is remembered"""
u = unique_justseen('AAAABBBCCDABB')
eq_(list('ABCDAB'), list(u))
def test_custom_key(self):
"""ensure the custom key comparison works"""
u = unique_justseen('AABCcAD', str.lower)
eq_(list('ABCAD'), list(u))
class IterExceptTests(TestCase):
"""Tests for ``iter_except()``"""
def test_exact_exception(self):
"""ensure the exact specified exception is caught"""
l = [1, 2, 3]
i = iter_except(l.pop, IndexError)
eq_(list(i), [3, 2, 1])
def test_generic_exception(self):
"""ensure the generic exception can be caught"""
l = [1, 2]
i = iter_except(l.pop, Exception)
eq_(list(i), [2, 1])
def test_uncaught_exception_is_raised(self):
"""ensure a non-specified exception is raised"""
l = [1, 2, 3]
i = iter_except(l.pop, KeyError)
assert_raises(IndexError, list, i)
def test_first(self):
"""ensure first is run before the function"""
l = [1, 2, 3]
f = lambda: 25
i = iter_except(l.pop, IndexError, f)
eq_(list(i), [25, 3, 2, 1])
class RandomProductTests(TestCase):
"""Tests for ``random_product()``
Since random.choice() has different results with the same seed across
python versions 2.x and 3.x, these tests use highly probably events to
create predictable outcomes across platforms.
"""
def test_simple_lists(self):
"""Ensure that one item is chosen from each list in each pair.
Also ensure that each item from each list eventually appears in
the chosen combinations.
Odds are roughly 1 in 7.1 * 10e16 that one item from either list will
not be chosen after 100 samplings of one item from each list. Just to
be safe, better use a known random seed, too.
"""
nums = [1, 2, 3]
lets = ['a', 'b', 'c']
n, m = zip(*[random_product(nums, lets) for _ in range(100)])
n, m = set(n), set(m)
eq_(n, set(nums))
eq_(m, set(lets))
eq_(len(n), len(nums))
eq_(len(m), len(lets))
def test_list_with_repeat(self):
"""ensure multiple items are chosen, and that they appear to be chosen
from one list then the next, in proper order.
"""
nums = [1, 2, 3]
lets = ['a', 'b', 'c']
r = list(random_product(nums, lets, repeat=100))
eq_(2 * 100, len(r))
n, m = set(r[::2]), set(r[1::2])
eq_(n, set(nums))
eq_(m, set(lets))
eq_(len(n), len(nums))
eq_(len(m), len(lets))
class RandomPermutationTests(TestCase):
"""Tests for ``random_permutation()``"""
def test_full_permutation(self):
"""ensure every item from the iterable is returned in a new ordering
15 elements have a 1 in 1.3 * 10e12 of appearing in sorted order, so
we fix a seed value just to be sure.
"""
i = range(15)
r = random_permutation(i)
eq_(set(i), set(r))
if i == r:
raise AssertionError("Values were not permuted")
def test_partial_permutation(self):
"""ensure all returned items are from the iterable, that the returned
permutation is of the desired length, and that all items eventually
get returned.
Sampling 100 permutations of length 5 from a set of 15 leaves a
(2/3)^100 chance that an item will not be chosen. Multiplied by 15
items, there is a 1 in 2.6e16 chance that at least 1 item will not
show up in the resulting output. Using a random seed will fix that.
"""
items = range(15)
item_set = set(items)
all_items = set()
for _ in xrange(100):
permutation = random_permutation(items, 5)
eq_(len(permutation), 5)
permutation_set = set(permutation)
ok_(permutation_set <= item_set)
all_items |= permutation_set
eq_(all_items, item_set)
class RandomCombinationTests(TestCase):
"""Tests for ``random_combination()``"""
def test_psuedorandomness(self):
"""ensure different subsets of the iterable get returned over many
samplings of random combinations"""
items = range(15)
all_items = set()
for _ in xrange(50):
combination = random_combination(items, 5)
all_items |= set(combination)
eq_(all_items, set(items))
def test_no_replacement(self):
"""ensure that elements are sampled without replacement"""
items = range(15)
for _ in xrange(50):
combination = random_combination(items, len(items))
eq_(len(combination), len(set(combination)))
assert_raises(ValueError, random_combination, items, len(items) + 1)
class RandomCombinationWithReplacementTests(TestCase):
"""Tests for ``random_combination_with_replacement()``"""
def test_replacement(self):
"""ensure that elements are sampled with replacement"""
items = range(5)
combo = random_combination_with_replacement(items, len(items) * 2)
eq_(2 * len(items), len(combo))
if len(set(combo)) == len(combo):
raise AssertionError("Combination contained no duplicates")
def test_psuedorandomness(self):
"""ensure different subsets of the iterable get returned over many
samplings of random combinations"""
items = range(15)
all_items = set()
for _ in xrange(50):
combination = random_combination_with_replacement(items, 5)
all_items |= set(combination)
eq_(all_items, set(items))

1722
libs/path.py Normal file

File diff suppressed because it is too large Load diff

1119
libs/test_path.py Normal file

File diff suppressed because it is too large Load diff