From 49c9ea13504a16e5adfa18f4927dfa5a39847522 Mon Sep 17 00:00:00 2001 From: Labrys of Knossos Date: Sun, 16 Dec 2018 10:24:14 -0500 Subject: [PATCH] Update futures to 3.2.0 --- libs/concurrent/futures/__init__.py | 7 +- libs/concurrent/futures/_base.py | 205 ++++++++++++++++++++-------- libs/concurrent/futures/_compat.py | 101 -------------- libs/concurrent/futures/process.py | 24 ++-- libs/concurrent/futures/thread.py | 52 +++++-- 5 files changed, 204 insertions(+), 185 deletions(-) delete mode 100644 libs/concurrent/futures/_compat.py diff --git a/libs/concurrent/futures/__init__.py b/libs/concurrent/futures/__init__.py index b5231f8a..428b14bd 100644 --- a/libs/concurrent/futures/__init__.py +++ b/libs/concurrent/futures/__init__.py @@ -14,5 +14,10 @@ from concurrent.futures._base import (FIRST_COMPLETED, Executor, wait, as_completed) -from concurrent.futures.process import ProcessPoolExecutor from concurrent.futures.thread import ThreadPoolExecutor + +try: + from concurrent.futures.process import ProcessPoolExecutor +except ImportError: + # some platforms don't have multiprocessing + pass diff --git a/libs/concurrent/futures/_base.py b/libs/concurrent/futures/_base.py index 8ed69b7d..510ffa53 100644 --- a/libs/concurrent/futures/_base.py +++ b/libs/concurrent/futures/_base.py @@ -1,15 +1,12 @@ # Copyright 2009 Brian Quinlan. All Rights Reserved. # Licensed to PSF under a Contributor Agreement. -from __future__ import with_statement +import collections import logging import threading +import itertools import time - -try: - from collections import namedtuple -except ImportError: - from concurrent.futures._compat import namedtuple +import types __author__ = 'Brian Quinlan (brian@sweetapp.com)' @@ -175,6 +172,29 @@ def _create_and_install_waiters(fs, return_when): return waiter + +def _yield_finished_futures(fs, waiter, ref_collect): + """ + Iterate on the list *fs*, yielding finished futures one by one in + reverse order. + Before yielding a future, *waiter* is removed from its waiters + and the future is removed from each set in the collection of sets + *ref_collect*. + + The aim of this function is to avoid keeping stale references after + the future is yielded and before the iterator resumes. + """ + while fs: + f = fs[-1] + for futures_set in ref_collect: + futures_set.remove(f) + with f._condition: + f._waiters.remove(waiter) + del f + # Careful not to keep a reference to the popped value + yield fs.pop() + + def as_completed(fs, timeout=None): """An iterator over the given futures that yields each as it completes. @@ -186,7 +206,8 @@ def as_completed(fs, timeout=None): Returns: An iterator that yields the given Futures as they complete (finished or - cancelled). + cancelled). If any given Futures are duplicated, they will be returned + once. Raises: TimeoutError: If the entire result iterator could not be generated @@ -195,16 +216,20 @@ def as_completed(fs, timeout=None): if timeout is not None: end_time = timeout + time.time() + fs = set(fs) + total_futures = len(fs) with _AcquireFutures(fs): finished = set( f for f in fs if f._state in [CANCELLED_AND_NOTIFIED, FINISHED]) - pending = set(fs) - finished + pending = fs - finished waiter = _create_and_install_waiters(fs, _AS_COMPLETED) - + finished = list(finished) try: - for future in finished: - yield future + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs,)): + f = [f] + yield f.pop() while pending: if timeout is None: @@ -214,7 +239,7 @@ def as_completed(fs, timeout=None): if wait_timeout < 0: raise TimeoutError( '%d (of %d) futures unfinished' % ( - len(pending), len(fs))) + len(pending), total_futures)) waiter.event.wait(wait_timeout) @@ -223,15 +248,20 @@ def as_completed(fs, timeout=None): waiter.finished_futures = [] waiter.event.clear() - for future in finished: - yield future - pending.remove(future) + # reverse to keep finishing order + finished.reverse() + for f in _yield_finished_futures(finished, waiter, + ref_collect=(fs, pending)): + f = [f] + yield f.pop() finally: + # Remove waiter from unfinished futures for f in fs: - f._waiters.remove(waiter) + with f._condition: + f._waiters.remove(waiter) -DoneAndNotDoneFutures = namedtuple( +DoneAndNotDoneFutures = collections.namedtuple( 'DoneAndNotDoneFutures', 'done not_done') def wait(fs, timeout=None, return_when=ALL_COMPLETED): """Wait for the futures in the given sequence to complete. @@ -276,7 +306,8 @@ def wait(fs, timeout=None, return_when=ALL_COMPLETED): waiter.event.wait(timeout) for f in fs: - f._waiters.remove(waiter) + with f._condition: + f._waiters.remove(waiter) done.update(waiter.finished_futures) return DoneAndNotDoneFutures(done, set(fs) - done) @@ -290,6 +321,7 @@ class Future(object): self._state = PENDING self._result = None self._exception = None + self._traceback = None self._waiters = [] self._done_callbacks = [] @@ -299,22 +331,41 @@ class Future(object): callback(self) except Exception: LOGGER.exception('exception calling callback for %r', self) + except BaseException: + # Explicitly let all other new-style exceptions through so + # that we can catch all old-style exceptions with a simple + # "except:" clause below. + # + # All old-style exception objects are instances of + # types.InstanceType, but "except types.InstanceType:" does + # not catch old-style exceptions for some reason. Thus, the + # only way to catch all old-style exceptions without catching + # any new-style exceptions is to filter out the new-style + # exceptions, which all derive from BaseException. + raise + except: + # Because of the BaseException clause above, this handler only + # executes for old-style exception objects. + LOGGER.exception('exception calling callback for %r', self) def __repr__(self): with self._condition: if self._state == FINISHED: if self._exception: - return '' % ( - hex(id(self)), + return '<%s at %#x state=%s raised %s>' % ( + self.__class__.__name__, + id(self), _STATE_TO_DESCRIPTION_MAP[self._state], self._exception.__class__.__name__) else: - return '' % ( - hex(id(self)), + return '<%s at %#x state=%s returned %s>' % ( + self.__class__.__name__, + id(self), _STATE_TO_DESCRIPTION_MAP[self._state], self._result.__class__.__name__) - return '' % ( - hex(id(self)), + return '<%s at %#x state=%s>' % ( + self.__class__.__name__, + id(self), _STATE_TO_DESCRIPTION_MAP[self._state]) def cancel(self): @@ -337,7 +388,7 @@ class Future(object): return True def cancelled(self): - """Return True if the future has cancelled.""" + """Return True if the future was cancelled.""" with self._condition: return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED] @@ -353,7 +404,14 @@ class Future(object): def __get_result(self): if self._exception: - raise self._exception + if isinstance(self._exception, types.InstanceType): + # The exception is an instance of an old-style class, which + # means type(self._exception) returns types.ClassType instead + # of the exception's actual class type. + exception_type = self._exception.__class__ + else: + exception_type = type(self._exception) + raise exception_type, self._exception, self._traceback else: return self._result @@ -405,6 +463,39 @@ class Future(object): else: raise TimeoutError() + def exception_info(self, timeout=None): + """Return a tuple of (exception, traceback) raised by the call that the + future represents. + + Args: + timeout: The number of seconds to wait for the exception if the + future isn't done. If None, then there is no limit on the wait + time. + + Returns: + The exception raised by the call that the future represents or None + if the call completed without raising. + + Raises: + CancelledError: If the future was cancelled. + TimeoutError: If the future didn't finish executing before the given + timeout. + """ + with self._condition: + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + + self._condition.wait(timeout) + + if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: + raise CancelledError() + elif self._state == FINISHED: + return self._exception, self._traceback + else: + raise TimeoutError() + def exception(self, timeout=None): """Return the exception raised by the call that the future represents. @@ -422,21 +513,7 @@ class Future(object): TimeoutError: If the future didn't finish executing before the given timeout. """ - - with self._condition: - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - - self._condition.wait(timeout) - - if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]: - raise CancelledError() - elif self._state == FINISHED: - return self._exception - else: - raise TimeoutError() + return self.exception_info(timeout)[0] # The following methods should only be used by Executors and in tests. def set_running_or_notify_cancel(self): @@ -475,8 +552,8 @@ class Future(object): return True else: LOGGER.critical('Future %s in unexpected state: %s', - id(self.future), - self.future._state) + id(self), + self._state) raise RuntimeError('Future in unexpected state') def set_result(self, result): @@ -492,19 +569,28 @@ class Future(object): self._condition.notify_all() self._invoke_callbacks() - def set_exception(self, exception): - """Sets the result of the future as being the given exception. + def set_exception_info(self, exception, traceback): + """Sets the result of the future as being the given exception + and traceback. Should only be used by Executor implementations and unit tests. """ with self._condition: self._exception = exception + self._traceback = traceback self._state = FINISHED for waiter in self._waiters: waiter.add_exception(self) self._condition.notify_all() self._invoke_callbacks() + def set_exception(self, exception): + """Sets the result of the future as being the given exception. + + Should only be used by Executor implementations and unit tests. + """ + self.set_exception_info(exception, None) + class Executor(object): """This is an abstract base class for concrete asynchronous executors.""" @@ -520,7 +606,7 @@ class Executor(object): raise NotImplementedError() def map(self, fn, *iterables, **kwargs): - """Returns a iterator equivalent to map(fn, iter). + """Returns an iterator equivalent to map(fn, iter). Args: fn: A callable that will take as many arguments as there are @@ -541,17 +627,24 @@ class Executor(object): if timeout is not None: end_time = timeout + time.time() - fs = [self.submit(fn, *args) for args in zip(*iterables)] + fs = [self.submit(fn, *args) for args in itertools.izip(*iterables)] - try: - for future in fs: - if timeout is None: - yield future.result() - else: - yield future.result(end_time - time.time()) - finally: - for future in fs: - future.cancel() + # Yield must be hidden in closure so that the futures are submitted + # before the first iterator value is required. + def result_iterator(): + try: + # reverse to keep finishing order + fs.reverse() + while fs: + # Careful not to keep a reference to the popped future + if timeout is None: + yield fs.pop().result() + else: + yield fs.pop().result(end_time - time.time()) + finally: + for future in fs: + future.cancel() + return result_iterator() def shutdown(self, wait=True): """Clean-up the resources associated with the Executor. diff --git a/libs/concurrent/futures/_compat.py b/libs/concurrent/futures/_compat.py deleted file mode 100644 index 11462326..00000000 --- a/libs/concurrent/futures/_compat.py +++ /dev/null @@ -1,101 +0,0 @@ -from keyword import iskeyword as _iskeyword -from operator import itemgetter as _itemgetter -import sys as _sys - - -def namedtuple(typename, field_names): - """Returns a new subclass of tuple with named fields. - - >>> Point = namedtuple('Point', 'x y') - >>> Point.__doc__ # docstring for the new class - 'Point(x, y)' - >>> p = Point(11, y=22) # instantiate with positional args or keywords - >>> p[0] + p[1] # indexable like a plain tuple - 33 - >>> x, y = p # unpack like a regular tuple - >>> x, y - (11, 22) - >>> p.x + p.y # fields also accessable by name - 33 - >>> d = p._asdict() # convert to a dictionary - >>> d['x'] - 11 - >>> Point(**d) # convert from a dictionary - Point(x=11, y=22) - >>> p._replace(x=100) # _replace() is like str.replace() but targets named fields - Point(x=100, y=22) - - """ - - # Parse and validate the field names. Validation serves two purposes, - # generating informative error messages and preventing template injection attacks. - if isinstance(field_names, basestring): - field_names = field_names.replace(',', ' ').split() # names separated by whitespace and/or commas - field_names = tuple(map(str, field_names)) - for name in (typename,) + field_names: - if not all(c.isalnum() or c=='_' for c in name): - raise ValueError('Type names and field names can only contain alphanumeric characters and underscores: %r' % name) - if _iskeyword(name): - raise ValueError('Type names and field names cannot be a keyword: %r' % name) - if name[0].isdigit(): - raise ValueError('Type names and field names cannot start with a number: %r' % name) - seen_names = set() - for name in field_names: - if name.startswith('_'): - raise ValueError('Field names cannot start with an underscore: %r' % name) - if name in seen_names: - raise ValueError('Encountered duplicate field name: %r' % name) - seen_names.add(name) - - # Create and fill-in the class template - numfields = len(field_names) - argtxt = repr(field_names).replace("'", "")[1:-1] # tuple repr without parens or quotes - reprtxt = ', '.join('%s=%%r' % name for name in field_names) - dicttxt = ', '.join('%r: t[%d]' % (name, pos) for pos, name in enumerate(field_names)) - template = '''class %(typename)s(tuple): - '%(typename)s(%(argtxt)s)' \n - __slots__ = () \n - _fields = %(field_names)r \n - def __new__(_cls, %(argtxt)s): - return _tuple.__new__(_cls, (%(argtxt)s)) \n - @classmethod - def _make(cls, iterable, new=tuple.__new__, len=len): - 'Make a new %(typename)s object from a sequence or iterable' - result = new(cls, iterable) - if len(result) != %(numfields)d: - raise TypeError('Expected %(numfields)d arguments, got %%d' %% len(result)) - return result \n - def __repr__(self): - return '%(typename)s(%(reprtxt)s)' %% self \n - def _asdict(t): - 'Return a new dict which maps field names to their values' - return {%(dicttxt)s} \n - def _replace(_self, **kwds): - 'Return a new %(typename)s object replacing specified fields with new values' - result = _self._make(map(kwds.pop, %(field_names)r, _self)) - if kwds: - raise ValueError('Got unexpected field names: %%r' %% kwds.keys()) - return result \n - def __getnewargs__(self): - return tuple(self) \n\n''' % locals() - for i, name in enumerate(field_names): - template += ' %s = _property(_itemgetter(%d))\n' % (name, i) - - # Execute the template string in a temporary namespace and - # support tracing utilities by setting a value for frame.f_globals['__name__'] - namespace = dict(_itemgetter=_itemgetter, __name__='namedtuple_%s' % typename, - _property=property, _tuple=tuple) - try: - exec(template, namespace) - except SyntaxError: - e = _sys.exc_info()[1] - raise SyntaxError(e.message + ':\n' + template) - result = namespace[typename] - - # For pickling to work, the __module__ variable needs to be set to the frame - # where the named tuple is created. Bypass this step in enviroments where - # sys._getframe is not defined (Jython for example). - if hasattr(_sys, '_getframe'): - result.__module__ = _sys._getframe(1).f_globals.get('__name__', '__main__') - - return result diff --git a/libs/concurrent/futures/process.py b/libs/concurrent/futures/process.py index 98684f8e..fa5b96fd 100644 --- a/libs/concurrent/futures/process.py +++ b/libs/concurrent/futures/process.py @@ -43,20 +43,14 @@ Process #1..n: _ResultItems in "Request Q" """ -from __future__ import with_statement import atexit +from concurrent.futures import _base +import Queue as queue import multiprocessing import threading import weakref import sys -from concurrent.futures import _base - -try: - import queue -except ImportError: - import Queue as queue - __author__ = 'Brian Quinlan (brian@sweetapp.com)' # Workers are created as daemon threads and processes. This is done to allow the @@ -79,11 +73,11 @@ _shutdown = False def _python_exit(): global _shutdown _shutdown = True - items = list(_threads_queues.items()) + items = list(_threads_queues.items()) if _threads_queues else () for t, q in items: q.put(None) for t, q in items: - t.join() + t.join(sys.maxint) # Controls how many more calls than processes will be queued in the call queue. # A smaller number will mean that processes spend more time idle waiting for @@ -132,7 +126,7 @@ def _process_worker(call_queue, result_queue): return try: r = call_item.fn(*call_item.args, **call_item.kwargs) - except BaseException: + except: e = sys.exc_info()[1] result_queue.put(_ResultItem(call_item.work_id, exception=e)) @@ -220,6 +214,8 @@ def _queue_management_worker(executor_reference, work_item.future.set_exception(result_item.exception) else: work_item.future.set_result(result_item.result) + # Delete references to object. See issue16284 + del work_item # Check whether we should start shutting down. executor = executor_reference() # No more work items can be added if: @@ -266,6 +262,7 @@ def _check_system_limits(): _system_limited = "system provides too few semaphores (%d available, 256 necessary)" % nsems_max raise NotImplementedError(_system_limited) + class ProcessPoolExecutor(_base.Executor): def __init__(self, max_workers=None): """Initializes a new ProcessPoolExecutor instance. @@ -280,6 +277,9 @@ class ProcessPoolExecutor(_base.Executor): if max_workers is None: self._max_workers = multiprocessing.cpu_count() else: + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + self._max_workers = max_workers # Make the call queue slightly larger than the number of processes to @@ -351,7 +351,7 @@ class ProcessPoolExecutor(_base.Executor): # Wake up queue management thread self._result_queue.put(None) if wait: - self._queue_management_thread.join() + self._queue_management_thread.join(sys.maxint) # To reduce the risk of openning too many files, remove references to # objects that use file descriptors. self._queue_management_thread = None diff --git a/libs/concurrent/futures/thread.py b/libs/concurrent/futures/thread.py index a45959d3..bb0ce9d7 100644 --- a/libs/concurrent/futures/thread.py +++ b/libs/concurrent/futures/thread.py @@ -3,18 +3,20 @@ """Implements ThreadPoolExecutor.""" -from __future__ import with_statement import atexit +from concurrent.futures import _base +import itertools +import Queue as queue import threading import weakref import sys -from concurrent.futures import _base - try: - import queue + from multiprocessing import cpu_count except ImportError: - import Queue as queue + # some platforms don't have multiprocessing + def cpu_count(): + return None __author__ = 'Brian Quinlan (brian@sweetapp.com)' @@ -38,11 +40,11 @@ _shutdown = False def _python_exit(): global _shutdown _shutdown = True - items = list(_threads_queues.items()) + items = list(_threads_queues.items()) if _threads_queues else () for t, q in items: q.put(None) for t, q in items: - t.join() + t.join(sys.maxint) atexit.register(_python_exit) @@ -59,9 +61,9 @@ class _WorkItem(object): try: result = self.fn(*self.args, **self.kwargs) - except BaseException: - e = sys.exc_info()[1] - self.future.set_exception(e) + except: + e, tb = sys.exc_info()[1:] + self.future.set_exception_info(e, tb) else: self.future.set_result(result) @@ -71,6 +73,8 @@ def _worker(executor_reference, work_queue): work_item = work_queue.get(block=True) if work_item is not None: work_item.run() + # Delete references to object. See issue16284 + del work_item continue executor = executor_reference() # Exit if: @@ -82,22 +86,37 @@ def _worker(executor_reference, work_queue): work_queue.put(None) return del executor - except BaseException: + except: _base.LOGGER.critical('Exception in worker', exc_info=True) + class ThreadPoolExecutor(_base.Executor): - def __init__(self, max_workers): + + # Used to assign unique thread names when thread_name_prefix is not supplied. + _counter = itertools.count().next + + def __init__(self, max_workers=None, thread_name_prefix=''): """Initializes a new ThreadPoolExecutor instance. Args: max_workers: The maximum number of threads that can be used to execute the given calls. + thread_name_prefix: An optional name prefix to give our threads. """ + if max_workers is None: + # Use this number because ThreadPoolExecutor is often + # used to overlap I/O instead of CPU work. + max_workers = (cpu_count() or 1) * 5 + if max_workers <= 0: + raise ValueError("max_workers must be greater than 0") + self._max_workers = max_workers self._work_queue = queue.Queue() self._threads = set() self._shutdown = False self._shutdown_lock = threading.Lock() + self._thread_name_prefix = (thread_name_prefix or + ("ThreadPoolExecutor-%d" % self._counter())) def submit(self, fn, *args, **kwargs): with self._shutdown_lock: @@ -119,8 +138,11 @@ class ThreadPoolExecutor(_base.Executor): q.put(None) # TODO(bquinlan): Should avoid creating new threads if there are more # idle threads than items in the work queue. - if len(self._threads) < self._max_workers: - t = threading.Thread(target=_worker, + num_threads = len(self._threads) + if num_threads < self._max_workers: + thread_name = '%s_%d' % (self._thread_name_prefix or self, + num_threads) + t = threading.Thread(name=thread_name, target=_worker, args=(weakref.ref(self, weakref_cb), self._work_queue)) t.daemon = True @@ -134,5 +156,5 @@ class ThreadPoolExecutor(_base.Executor): self._work_queue.put(None) if wait: for t in self._threads: - t.join() + t.join(sys.maxint) shutdown.__doc__ = _base.Executor.shutdown.__doc__