Update futures to 3.2.0

This commit is contained in:
Labrys of Knossos 2018-12-16 10:24:14 -05:00
commit 49c9ea1350
5 changed files with 204 additions and 185 deletions

View file

@ -14,5 +14,10 @@ from concurrent.futures._base import (FIRST_COMPLETED,
Executor,
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor
try:
from concurrent.futures.process import ProcessPoolExecutor
except ImportError:
# some platforms don't have multiprocessing
pass

View file

@ -1,15 +1,12 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
from __future__ import with_statement
import collections
import logging
import threading
import itertools
import time
try:
from collections import namedtuple
except ImportError:
from concurrent.futures._compat import namedtuple
import types
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
@ -175,6 +172,29 @@ def _create_and_install_waiters(fs, return_when):
return waiter
def _yield_finished_futures(fs, waiter, ref_collect):
"""
Iterate on the list *fs*, yielding finished futures one by one in
reverse order.
Before yielding a future, *waiter* is removed from its waiters
and the future is removed from each set in the collection of sets
*ref_collect*.
The aim of this function is to avoid keeping stale references after
the future is yielded and before the iterator resumes.
"""
while fs:
f = fs[-1]
for futures_set in ref_collect:
futures_set.remove(f)
with f._condition:
f._waiters.remove(waiter)
del f
# Careful not to keep a reference to the popped value
yield fs.pop()
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
@ -186,7 +206,8 @@ def as_completed(fs, timeout=None):
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled).
cancelled). If any given Futures are duplicated, they will be returned
once.
Raises:
TimeoutError: If the entire result iterator could not be generated
@ -195,16 +216,20 @@ def as_completed(fs, timeout=None):
if timeout is not None:
end_time = timeout + time.time()
fs = set(fs)
total_futures = len(fs)
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
pending = fs - finished
waiter = _create_and_install_waiters(fs, _AS_COMPLETED)
finished = list(finished)
try:
for future in finished:
yield future
for f in _yield_finished_futures(finished, waiter,
ref_collect=(fs,)):
f = [f]
yield f.pop()
while pending:
if timeout is None:
@ -214,7 +239,7 @@ def as_completed(fs, timeout=None):
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
len(pending), total_futures))
waiter.event.wait(wait_timeout)
@ -223,15 +248,20 @@ def as_completed(fs, timeout=None):
waiter.finished_futures = []
waiter.event.clear()
for future in finished:
yield future
pending.remove(future)
# reverse to keep finishing order
finished.reverse()
for f in _yield_finished_futures(finished, waiter,
ref_collect=(fs, pending)):
f = [f]
yield f.pop()
finally:
# Remove waiter from unfinished futures
for f in fs:
f._waiters.remove(waiter)
with f._condition:
f._waiters.remove(waiter)
DoneAndNotDoneFutures = namedtuple(
DoneAndNotDoneFutures = collections.namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.
@ -276,7 +306,8 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED):
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
with f._condition:
f._waiters.remove(waiter)
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)
@ -290,6 +321,7 @@ class Future(object):
self._state = PENDING
self._result = None
self._exception = None
self._traceback = None
self._waiters = []
self._done_callbacks = []
@ -299,22 +331,41 @@ class Future(object):
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
except BaseException:
# Explicitly let all other new-style exceptions through so
# that we can catch all old-style exceptions with a simple
# "except:" clause below.
#
# All old-style exception objects are instances of
# types.InstanceType, but "except types.InstanceType:" does
# not catch old-style exceptions for some reason. Thus, the
# only way to catch all old-style exceptions without catching
# any new-style exceptions is to filter out the new-style
# exceptions, which all derive from BaseException.
raise
except:
# Because of the BaseException clause above, this handler only
# executes for old-style exception objects.
LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<Future at %s state=%s raised %s>' % (
hex(id(self)),
return '<%s at %#x state=%s raised %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<Future at %s state=%s returned %s>' % (
hex(id(self)),
return '<%s at %#x state=%s returned %s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future at %s state=%s>' % (
hex(id(self)),
return '<%s at %#x state=%s>' % (
self.__class__.__name__,
id(self),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
@ -337,7 +388,7 @@ class Future(object):
return True
def cancelled(self):
"""Return True if the future has cancelled."""
"""Return True if the future was cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
@ -353,7 +404,14 @@ class Future(object):
def __get_result(self):
if self._exception:
raise self._exception
if isinstance(self._exception, types.InstanceType):
# The exception is an instance of an old-style class, which
# means type(self._exception) returns types.ClassType instead
# of the exception's actual class type.
exception_type = self._exception.__class__
else:
exception_type = type(self._exception)
raise exception_type, self._exception, self._traceback
else:
return self._result
@ -405,6 +463,39 @@ class Future(object):
else:
raise TimeoutError()
def exception_info(self, timeout=None):
"""Return a tuple of (exception, traceback) raised by the call that the
future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception, self._traceback
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception, self._traceback
else:
raise TimeoutError()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
@ -422,21 +513,7 @@ class Future(object):
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
return self.exception_info(timeout)[0]
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
@ -475,8 +552,8 @@ class Future(object):
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self.future),
self.future._state)
id(self),
self._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
@ -492,19 +569,28 @@ class Future(object):
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
def set_exception_info(self, exception, traceback):
"""Sets the result of the future as being the given exception
and traceback.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._exception = exception
self._traceback = traceback
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
"""
self.set_exception_info(exception, None)
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
@ -520,7 +606,7 @@ class Executor(object):
raise NotImplementedError()
def map(self, fn, *iterables, **kwargs):
"""Returns a iterator equivalent to map(fn, iter).
"""Returns an iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take as many arguments as there are
@ -541,17 +627,24 @@ class Executor(object):
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
# Yield must be hidden in closure so that the futures are submitted
# before the first iterator value is required.
def result_iterator():
try:
# reverse to keep finishing order
fs.reverse()
while fs:
# Careful not to keep a reference to the popped future
if timeout is None:
yield fs.pop().result()
else:
yield fs.pop().result(end_time - time.time())
finally:
for future in fs:
future.cancel()
return result_iterator()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.

View file

@ -1,101 +0,0 @@
from keyword import iskeyword as _iskeyword
from operator import itemgetter as _itemgetter
import sys as _sys
def namedtuple(typename, field_names):
"""Returns a new subclass of tuple with named fields.
>>> Point = namedtuple('Point', 'x y')
>>> Point.__doc__ # docstring for the new class
'Point(x, y)'
>>> p = Point(11, y=22) # instantiate with positional args or keywords
>>> p[0] + p[1] # indexable like a plain tuple
33
>>> x, y = p # unpack like a regular tuple
>>> x, y
(11, 22)
>>> p.x + p.y # fields also accessable by name
33
>>> d = p._asdict() # convert to a dictionary
>>> d['x']
11
>>> Point(**d) # convert from a dictionary
Point(x=11, y=22)
>>> p._replace(x=100) # _replace() is like str.replace() but targets named fields
Point(x=100, y=22)
"""
# Parse and validate the field names. Validation serves two purposes,
# generating informative error messages and preventing template injection attacks.
if isinstance(field_names, basestring):
field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas
field_names = tuple(map(str, field_names))
for name in (typename,) + field_names:
if not all(c.isalnum() or c=='_' for c in name):
raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name)
if _iskeyword(name):
raise ValueError('Type names and field names cannot be a keyword: %r' % name)
if name[0].isdigit():
raise ValueError('Type names and field names cannot start with a number: %r' % name)
seen_names = set()
for name in field_names:
if name.startswith('_'):
raise ValueError('Field names cannot start with an underscore: %r' % name)
if name in seen_names:
raise ValueError('Encountered duplicate field name: %r' % name)
seen_names.add(name)
# Create and fill-in the class template
numfields = len(field_names)
argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes
reprtxt = ', '.join('%s=%%r' % name for name in field_names)
dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names))
template = '''class %(typename)s(tuple):
'%(typename)s(%(argtxt)s)' \n
__slots__ = () \n
_fields = %(field_names)r \n
def __new__(_cls, %(argtxt)s):
return _tuple.__new__(_cls, (%(argtxt)s)) \n
@classmethod
def _make(cls, iterable, new=tuple.__new__, len=len):
'Make a new %(typename)s object from a sequence or iterable'
result = new(cls, iterable)
if len(result) != %(numfields)d:
raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result))
return result \n
def __repr__(self):
return '%(typename)s(%(reprtxt)s)' %% self \n
def _asdict(t):
'Return a new dict which maps field names to their values'
return {%(dicttxt)s} \n
def _replace(_self, **kwds):
'Return a new %(typename)s object replacing specified fields with new values'
result = _self._make(map(kwds.pop, %(field_names)r, _self))
if kwds:
raise ValueError('Got unexpected field names: %%r' %% kwds.keys())
return result \n
def __getnewargs__(self):
return tuple(self) \n\n''' % locals()
for i, name in enumerate(field_names):
template += ' %s = _property(_itemgetter(%d))\n' % (name, i)
# Execute the template string in a temporary namespace and
# support tracing utilities by setting a value for frame.f_globals['__name__']
namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename,
_property=property, _tuple=tuple)
try:
exec(template, namespace)
except SyntaxError:
e = _sys.exc_info()[1]
raise SyntaxError(e.message + ':\n' + template)
result = namespace[typename]
# For pickling to work, the __module__ variable needs to be set to the frame
# where the named tuple is created. Bypass this step in enviroments where
# sys._getframe is not defined (Jython for example).
if hasattr(_sys, '_getframe'):
result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__')
return result

View file

@ -43,20 +43,14 @@ Process #1..n:
_ResultItems in "Request Q"
"""
from __future__ import with_statement
import atexit
from concurrent.futures import _base
import Queue as queue
import multiprocessing
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
except ImportError:
import Queue as queue
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
# Workers are created as daemon threads and processes. This is done to allow the
@ -79,11 +73,11 @@ _shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
items = list(_threads_queues.items()) if _threads_queues else ()
for t, q in items:
q.put(None)
for t, q in items:
t.join()
t.join(sys.maxint)
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
@ -132,7 +126,7 @@ def _process_worker(call_queue, result_queue):
return
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException:
except:
e = sys.exc_info()[1]
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
@ -220,6 +214,8 @@ def _queue_management_worker(executor_reference,
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
# Delete references to object. See issue16284
del work_item
# Check whether we should start shutting down.
executor = executor_reference()
# No more work items can be added if:
@ -266,6 +262,7 @@ def _check_system_limits():
_system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max
raise NotImplementedError(_system_limited)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
@ -280,6 +277,9 @@ class ProcessPoolExecutor(_base.Executor):
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
else:
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
@ -351,7 +351,7 @@ class ProcessPoolExecutor(_base.Executor):
# Wake up queue management thread
self._result_queue.put(None)
if wait:
self._queue_management_thread.join()
self._queue_management_thread.join(sys.maxint)
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None

View file

@ -3,18 +3,20 @@
"""Implements ThreadPoolExecutor."""
from __future__ import with_statement
import atexit
from concurrent.futures import _base
import itertools
import Queue as queue
import threading
import weakref
import sys
from concurrent.futures import _base
try:
import queue
from multiprocessing import cpu_count
except ImportError:
import Queue as queue
# some platforms don't have multiprocessing
def cpu_count():
return None
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
@ -38,11 +40,11 @@ _shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
items = list(_threads_queues.items())
items = list(_threads_queues.items()) if _threads_queues else ()
for t, q in items:
q.put(None)
for t, q in items:
t.join()
t.join(sys.maxint)
atexit.register(_python_exit)
@ -59,9 +61,9 @@ class _WorkItem(object):
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException:
e = sys.exc_info()[1]
self.future.set_exception(e)
except:
e, tb = sys.exc_info()[1:]
self.future.set_exception_info(e, tb)
else:
self.future.set_result(result)
@ -71,6 +73,8 @@ def _worker(executor_reference, work_queue):
work_item = work_queue.get(block=True)
if work_item is not None:
work_item.run()
# Delete references to object. See issue16284
del work_item
continue
executor = executor_reference()
# Exit if:
@ -82,22 +86,37 @@ def _worker(executor_reference, work_queue):
work_queue.put(None)
return
del executor
except BaseException:
except:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers):
# Used to assign unique thread names when thread_name_prefix is not supplied.
_counter = itertools.count().next
def __init__(self, max_workers=None, thread_name_prefix=''):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
thread_name_prefix: An optional name prefix to give our threads.
"""
if max_workers is None:
# Use this number because ThreadPoolExecutor is often
# used to overlap I/O instead of CPU work.
max_workers = (cpu_count() or 1) * 5
if max_workers <= 0:
raise ValueError("max_workers must be greater than 0")
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
self._thread_name_prefix = (thread_name_prefix or
("ThreadPoolExecutor-%d" % self._counter()))
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
@ -119,8 +138,11 @@ class ThreadPoolExecutor(_base.Executor):
q.put(None)
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
num_threads = len(self._threads)
if num_threads < self._max_workers:
thread_name = '%s_%d' % (self._thread_name_prefix or self,
num_threads)
t = threading.Thread(name=thread_name, target=_worker,
args=(weakref.ref(self, weakref_cb),
self._work_queue))
t.daemon = True
@ -134,5 +156,5 @@ class ThreadPoolExecutor(_base.Executor):
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
t.join(sys.maxint)
shutdown.__doc__ = _base.Executor.shutdown.__doc__