diff --git a/lib/tqdm/__init__.py b/lib/tqdm/__init__.py deleted file mode 100644 index d6b251fe..00000000 --- a/lib/tqdm/__init__.py +++ /dev/null @@ -1,34 +0,0 @@ -from ._tqdm import tqdm -from ._tqdm import trange -from ._tqdm_gui import tqdm_gui -from ._tqdm_gui import tgrange -from ._tqdm_pandas import tqdm_pandas -from ._main import main -from ._monitor import TMonitor, TqdmSynchronisationWarning -from ._version import __version__ # NOQA -from ._tqdm import TqdmTypeError, TqdmKeyError, TqdmWarning, \ - TqdmDeprecationWarning, TqdmExperimentalWarning, \ - TqdmMonitorWarning - -__all__ = ['tqdm', 'tqdm_gui', 'trange', 'tgrange', 'tqdm_pandas', - 'tqdm_notebook', 'tnrange', 'main', 'TMonitor', - 'TqdmTypeError', 'TqdmKeyError', - 'TqdmWarning', 'TqdmDeprecationWarning', - 'TqdmExperimentalWarning', - 'TqdmMonitorWarning', 'TqdmSynchronisationWarning', - '__version__'] - - -def tqdm_notebook(*args, **kwargs): # pragma: no cover - """See tqdm._tqdm_notebook.tqdm_notebook for full documentation""" - from ._tqdm_notebook import tqdm_notebook as _tqdm_notebook - return _tqdm_notebook(*args, **kwargs) - - -def tnrange(*args, **kwargs): # pragma: no cover - """ - A shortcut for tqdm_notebook(xrange(*args), **kwargs). - On Python3+ range is used instead of xrange. - """ - from ._tqdm_notebook import tnrange as _tnrange - return _tnrange(*args, **kwargs) diff --git a/lib/tqdm/__main__.py b/lib/tqdm/__main__.py deleted file mode 100644 index be7e86a5..00000000 --- a/lib/tqdm/__main__.py +++ /dev/null @@ -1,2 +0,0 @@ -from ._main import main -main() diff --git a/lib/tqdm/_main.py b/lib/tqdm/_main.py deleted file mode 100644 index 923324bd..00000000 --- a/lib/tqdm/_main.py +++ /dev/null @@ -1,207 +0,0 @@ -from ._tqdm import tqdm, TqdmTypeError, TqdmKeyError -from ._version import __version__ # NOQA -import sys -import re -import logging -__all__ = ["main"] - - -def cast(val, typ): - log = logging.getLogger(__name__) - log.debug((val, typ)) - if " or " in typ: - for t in typ.split(" or "): - try: - return cast(val, t) - except TqdmTypeError: - pass - raise TqdmTypeError(val + ' : ' + typ) - - # sys.stderr.write('\ndebug | `val:type`: `' + val + ':' + typ + '`.\n') - if typ == 'bool': - if (val == 'True') or (val == ''): - return True - elif val == 'False': - return False - else: - raise TqdmTypeError(val + ' : ' + typ) - try: - return eval(typ + '("' + val + '")') - except: - if typ == 'chr': - return chr(ord(eval('"' + val + '"'))) - else: - raise TqdmTypeError(val + ' : ' + typ) - - -def posix_pipe(fin, fout, delim='\n', buf_size=256, - callback=lambda int: None # pragma: no cover - ): - """ - Params - ------ - fin : file with `read(buf_size : int)` method - fout : file with `write` (and optionally `flush`) methods. - callback : function(int), e.g.: `tqdm.update` - """ - fp_write = fout.write - - # tmp = '' - if not delim: - while True: - tmp = fin.read(buf_size) - - # flush at EOF - if not tmp: - getattr(fout, 'flush', lambda: None)() # pragma: no cover - return - - fp_write(tmp) - callback(len(tmp)) - # return - - buf = '' - # n = 0 - while True: - tmp = fin.read(buf_size) - - # flush at EOF - if not tmp: - if buf: - fp_write(buf) - callback(1 + buf.count(delim)) # n += 1 + buf.count(delim) - getattr(fout, 'flush', lambda: None)() # pragma: no cover - return # n - - while True: - try: - i = tmp.index(delim) - except ValueError: - buf += tmp - break - else: - fp_write(buf + tmp[:i + len(delim)]) - callback(1) # n += 1 - buf = '' - tmp = tmp[i + len(delim):] - - -# ((opt, type), ... ) -RE_OPTS = re.compile(r'\n {8}(\S+)\s{2,}:\s*([^,]+)') -# better split method assuming no positional args -RE_SHLEX = re.compile(r'\s*(? : \2', d) - split = RE_OPTS.split(d) - opt_types_desc = zip(split[1::3], split[2::3], split[3::3]) - d = ''.join('\n --{0}=<{0}> : {1}{2}'.format(*otd) - for otd in opt_types_desc if otd[0] not in UNSUPPORTED_OPTS) - - d = """Usage: - tqdm [--help | options] - -Options: - -h, --help Print this help and exit - -v, --version Print version and exit - -""" + d.strip('\n') + '\n' - - # opts = docopt(d, version=__version__) - if any(v in sys.argv for v in ('-v', '--version')): - sys.stdout.write(__version__ + '\n') - sys.exit(0) - elif any(v in sys.argv for v in ('-h', '--help')): - sys.stdout.write(d + '\n') - sys.exit(0) - - argv = RE_SHLEX.split(' '.join(["tqdm"] + sys.argv[1:])) - opts = dict(zip(argv[1::2], argv[2::2])) - - log.debug(opts) - opts.pop('log', True) - - tqdm_args = {'file': fp} - try: - for (o, v) in opts.items(): - try: - tqdm_args[o] = cast(v, opt_types[o]) - except KeyError as e: - raise TqdmKeyError(str(e)) - log.debug('args:' + str(tqdm_args)) - except: - fp.write('\nError:\nUsage:\n tqdm [--help | options]\n') - for i in sys.stdin: - sys.stdout.write(i) - raise - else: - buf_size = tqdm_args.pop('buf_size', 256) - delim = tqdm_args.pop('delim', '\n') - delim_per_char = tqdm_args.pop('bytes', False) - if delim_per_char: - tqdm_args.setdefault('unit', 'B') - tqdm_args.setdefault('unit_scale', True) - tqdm_args.setdefault('unit_divisor', 1024) - log.debug(tqdm_args) - with tqdm(**tqdm_args) as t: - posix_pipe(sys.stdin, sys.stdout, - '', buf_size, t.update) - elif delim == '\n': - log.debug(tqdm_args) - for i in tqdm(sys.stdin, **tqdm_args): - sys.stdout.write(i) - else: - log.debug(tqdm_args) - with tqdm(**tqdm_args) as t: - posix_pipe(sys.stdin, sys.stdout, - delim, buf_size, t.update) diff --git a/lib/tqdm/_monitor.py b/lib/tqdm/_monitor.py deleted file mode 100644 index 534acd74..00000000 --- a/lib/tqdm/_monitor.py +++ /dev/null @@ -1,93 +0,0 @@ -from threading import Event, Thread -from time import time -from warnings import warn -__all__ = ["TMonitor", "TqdmSynchronisationWarning"] - - -class TqdmSynchronisationWarning(RuntimeWarning): - """tqdm multi-thread/-process errors which may cause incorrect nesting - but otherwise no adverse effects""" - pass - - -class TMonitor(Thread): - """ - Monitoring thread for tqdm bars. - Monitors if tqdm bars are taking too much time to display - and readjusts miniters automatically if necessary. - - Parameters - ---------- - tqdm_cls : class - tqdm class to use (can be core tqdm or a submodule). - sleep_interval : fload - Time to sleep between monitoring checks. - """ - - # internal vars for unit testing - _time = None - _event = None - - def __init__(self, tqdm_cls, sleep_interval): - Thread.__init__(self) - self.daemon = True # kill thread when main killed (KeyboardInterrupt) - self.was_killed = Event() - self.woken = 0 # last time woken up, to sync with monitor - self.tqdm_cls = tqdm_cls - self.sleep_interval = sleep_interval - if TMonitor._time is not None: - self._time = TMonitor._time - else: - self._time = time - if TMonitor._event is not None: - self._event = TMonitor._event - else: - self._event = Event - self.start() - - def exit(self): - self.was_killed.set() - self.join() - return self.report() - - def run(self): - cur_t = self._time() - while True: - # After processing and before sleeping, notify that we woke - # Need to be done just before sleeping - self.woken = cur_t - # Sleep some time... - self.was_killed.wait(self.sleep_interval) - # Quit if killed - if self.was_killed.is_set(): - return - # Then monitor! - # Acquire lock (to access _instances) - with self.tqdm_cls.get_lock(): - cur_t = self._time() - # Check tqdm instances are waiting too long to print - instances = self.tqdm_cls._instances.copy() - for instance in instances: - # Check event in loop to reduce blocking time on exit - if self.was_killed.is_set(): - return - # Avoid race by checking that the instance started - if not hasattr(instance, 'start_t'): # pragma: nocover - continue - # Only if mininterval > 1 (else iterations are just slow) - # and last refresh exceeded maxinterval - if instance.miniters > 1 and \ - (cur_t - instance.last_print_t) >= \ - instance.maxinterval: - # force bypassing miniters on next iteration - # (dynamic_miniters adjusts mininterval automatically) - instance.miniters = 1 - # Refresh now! (works only for manual tqdm) - instance.refresh(nolock=True) - if instances != self.tqdm_cls._instances: # pragma: nocover - warn("Set changed size during iteration" + - " (see https://github.com/tqdm/tqdm/issues/481)", - TqdmSynchronisationWarning) - - def report(self): - return not self.was_killed.is_set() diff --git a/lib/tqdm/_tqdm.py b/lib/tqdm/_tqdm.py deleted file mode 100644 index 0eafbf0a..00000000 --- a/lib/tqdm/_tqdm.py +++ /dev/null @@ -1,1223 +0,0 @@ -""" -Customisable progressbar decorator for iterators. -Includes a default (x)range iterator printing to stderr. - -Usage: - >>> from tqdm import trange[, tqdm] - >>> for i in trange(10): #same as: for i in tqdm(xrange(10)) - ... ... -""" -from __future__ import absolute_import -# integer division / : float, // : int -from __future__ import division -# compatibility functions and utilities -from ._utils import _supports_unicode, _environ_cols_wrapper, _range, _unich, \ - _term_move_up, _unicode, WeakSet, _basestring, _OrderedDict -from ._monitor import TMonitor -# native libraries -import sys -from numbers import Number -from time import time -from contextlib import contextmanager -# For parallelism safety -import multiprocessing as mp -import threading as th -from warnings import warn - -__author__ = {"github.com/": ["noamraph", "obiwanus", "kmike", "hadim", - "casperdcl", "lrq3000"]} -__all__ = ['tqdm', 'trange', - 'TqdmTypeError', 'TqdmKeyError', 'TqdmWarning', - 'TqdmExperimentalWarning', 'TqdmDeprecationWarning', - 'TqdmMonitorWarning'] - - -class TqdmTypeError(TypeError): - pass - - -class TqdmKeyError(KeyError): - pass - - -class TqdmWarning(Warning): - """base class for all tqdm warnings. - - Used for non-external-code-breaking errors, such as garbled printing. - """ - def __init__(self, msg, fp_write=None, *a, **k): - if fp_write is not None: - fp_write("\n" + self.__class__.__name__ + ": " + str(msg).rstrip() + '\n') - else: - super(TqdmWarning, self).__init__(msg, *a, **k) - -class TqdmExperimentalWarning(TqdmWarning, FutureWarning): - """beta feature, unstable API and behaviour""" - pass - -class TqdmDeprecationWarning(TqdmWarning, DeprecationWarning): - # not suppressed if raised - pass - - -class TqdmMonitorWarning(TqdmWarning, RuntimeWarning): - """tqdm monitor errors which do not affect external functionality""" - pass - - -# Create global parallelism locks to avoid racing issues with parallel bars -# works only if fork available (Linux, MacOSX, but not on Windows) -try: - mp_lock = mp.RLock() # multiprocessing lock -except ImportError: # pragma: no cover - mp_lock = None -except OSError: # pragma: no cover - mp_lock = None -try: - th_lock = th.RLock() # thread lock -except OSError: # pragma: no cover - th_lock = None - - -class TqdmDefaultWriteLock(object): - """ - Provide a default write lock for thread and multiprocessing safety. - Works only on platforms supporting `fork` (so Windows is excluded). - On Windows, you need to supply the lock from the parent to the children as - an argument to joblib or the parallelism lib you use. - """ - def __init__(self): - global mp_lock, th_lock - self.locks = [lk for lk in [mp_lock, th_lock] if lk is not None] - - def acquire(self): - for lock in self.locks: - lock.acquire() - - def release(self): - for lock in self.locks[::-1]: # Release in inverse order of acquisition - lock.release() - - def __enter__(self): - self.acquire() - - def __exit__(self, *exc): - self.release() - - -class tqdm(object): - """ - Decorate an iterable object, returning an iterator which acts exactly - like the original iterable, but prints a dynamically updating - progressbar every time a value is requested. - """ - - monitor_interval = 10 # set to 0 to disable the thread - monitor = None - _lock = TqdmDefaultWriteLock() - - @staticmethod - def format_sizeof(num, suffix='', divisor=1000): - """ - Formats a number (greater than unity) with SI Order of Magnitude - prefixes. - - Parameters - ---------- - num : float - Number ( >= 1) to format. - suffix : str, optional - Post-postfix [default: '']. - divisor : float, optionl - Divisor between prefixes [default: 1000]. - - Returns - ------- - out : str - Number with Order of Magnitude SI unit postfix. - """ - for unit in ['', 'k', 'M', 'G', 'T', 'P', 'E', 'Z']: - if abs(num) < 999.95: - if abs(num) < 99.95: - if abs(num) < 9.995: - return '{0:1.2f}'.format(num) + unit + suffix - return '{0:2.1f}'.format(num) + unit + suffix - return '{0:3.0f}'.format(num) + unit + suffix - num /= divisor - return '{0:3.1f}Y'.format(num) + suffix - - @staticmethod - def format_interval(t): - """ - Formats a number of seconds as a clock time, [H:]MM:SS - - Parameters - ---------- - t : int - Number of seconds. - Returns - ------- - out : str - [H:]MM:SS - """ - mins, s = divmod(int(t), 60) - h, m = divmod(mins, 60) - if h: - return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s) - else: - return '{0:02d}:{1:02d}'.format(m, s) - - @staticmethod - def status_printer(file): - """ - Manage the printing and in-place updating of a line of characters. - Note that if the string is longer than a line, then in-place - updating may not work (it will print a new line at each refresh). - """ - fp = file - fp_flush = getattr(fp, 'flush', lambda: None) # pragma: no cover - - def fp_write(s): - fp.write(_unicode(s)) - fp_flush() - - last_len = [0] - - def print_status(s): - len_s = len(s) - fp_write('\r' + s + (' ' * max(last_len[0] - len_s, 0))) - last_len[0] = len_s - - return print_status - - @staticmethod - def format_meter(n, total, elapsed, ncols=None, prefix='', ascii=False, - unit='it', unit_scale=False, rate=None, bar_format=None, - postfix=None, unit_divisor=1000): - """ - Return a string-based progress bar given some parameters - - Parameters - ---------- - n : int - Number of finished iterations. - total : int - The expected total number of iterations. If meaningless (), only - basic progress statistics are displayed (no ETA). - elapsed : float - Number of seconds passed since start. - ncols : int, optional - The width of the entire output message. If specified, - dynamically resizes the progress meter to stay within this bound - [default: None]. The fallback meter width is 10 for the progress - bar + no limit for the iterations counter and statistics. If 0, - will not print any meter (only stats). - prefix : str, optional - Prefix message (included in total width) [default: '']. - Use as {desc} in bar_format string. - ascii : bool, optional - If not set, use unicode (smooth blocks) to fill the meter - [default: False]. The fallback is to use ASCII characters - (1-9 #). - unit : str, optional - The iteration unit [default: 'it']. - unit_scale : bool or int or float, optional - If 1 or True, the number of iterations will be printed with an - appropriate SI metric prefix (k = 10^3, M = 10^6, etc.) - [default: False]. If any other non-zero number, will scale - `total` and `n`. - rate : float, optional - Manual override for iteration rate. - If [default: None], uses n/elapsed. - bar_format : str, optional - Specify a custom bar string formatting. May impact performance. - [default: '{l_bar}{bar}{r_bar}'], where - l_bar='{desc}: {percentage:3.0f}%|' and - r_bar='| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, ' - '{rate_fmt}{postfix}]' - Possible vars: l_bar, bar, r_bar, n, n_fmt, total, total_fmt, - percentage, rate, rate_fmt, rate_noinv, rate_noinv_fmt, - rate_inv, rate_inv_fmt, elapsed, remaining, desc, postfix. - Note that a trailing ": " is automatically removed after {desc} - if the latter is empty. - postfix : str, optional - Similar to `prefix`, but placed at the end - (e.g. for additional stats). - Note: postfix is a string for this method. Not a dict. - unit_divisor : float, optional - [default: 1000], ignored unless `unit_scale` is True. - - Returns - ------- - out : Formatted meter and stats, ready to display. - """ - - # sanity check: total - if total and n > total: - total = None - - # apply custom scale if necessary - if unit_scale and unit_scale not in (True, 1): - total *= unit_scale - n *= unit_scale - unit_scale = False - - format_interval = tqdm.format_interval - elapsed_str = format_interval(elapsed) - - # if unspecified, attempt to use rate = average speed - # (we allow manual override since predicting time is an arcane art) - if rate is None and elapsed: - rate = n / elapsed - inv_rate = 1 / rate if rate else None - format_sizeof = tqdm.format_sizeof - rate_noinv_fmt = ((format_sizeof(rate) if unit_scale else - '{0:5.2f}'.format(rate)) - if rate else '?') + unit + '/s' - rate_inv_fmt = ((format_sizeof(inv_rate) if unit_scale else - '{0:5.2f}'.format(inv_rate)) - if inv_rate else '?') + 's/' + unit - rate_fmt = rate_inv_fmt if inv_rate and inv_rate > 1 else rate_noinv_fmt - - if unit_scale: - n_fmt = format_sizeof(n, divisor=unit_divisor) - total_fmt = format_sizeof(total, divisor=unit_divisor) \ - if total else None - else: - n_fmt = str(n) - total_fmt = str(total) - - # total is known: we can predict some stats - if total: - # fractional and percentage progress - frac = n / total - percentage = frac * 100 - - remaining_str = format_interval((total - n) / rate) \ - if rate else '?' - - # format the stats displayed to the left and right sides of the bar - if prefix: - # old prefix setup work around - bool_prefix_colon_already = (prefix[-2:] == ": ") - l_bar = prefix if bool_prefix_colon_already else prefix + ": " - else: - l_bar = '' - l_bar += '{0:3.0f}%|'.format(percentage) - r_bar = '| {0}/{1} [{2}<{3}, {4}{5}]'.format( - n_fmt, total_fmt, elapsed_str, remaining_str, rate_fmt, - ', ' + postfix if postfix else '') - - if ncols == 0: - return l_bar[:-1] + r_bar[1:] - - if bar_format: - # Custom bar formatting - # Populate a dict with all available progress indicators - bar_args = {'n': n, - 'n_fmt': n_fmt, - 'total': total, - 'total_fmt': total_fmt, - 'percentage': percentage, - 'rate': inv_rate if inv_rate and inv_rate > 1 - else rate, - 'rate_fmt': rate_fmt, - 'rate_noinv': rate, - 'rate_noinv_fmt': rate_noinv_fmt, - 'rate_inv': inv_rate, - 'rate_inv_fmt': rate_inv_fmt, - 'elapsed': elapsed_str, - 'remaining': remaining_str, - 'l_bar': l_bar, - 'r_bar': r_bar, - 'desc': prefix or '', - 'postfix': ', ' + postfix if postfix else '', - # 'bar': full_bar # replaced by procedure below - } - - # auto-remove colon for empty `desc` - if not prefix: - bar_format = bar_format.replace("{desc}: ", '') - - # Interpolate supplied bar format with the dict - if '{bar}' in bar_format: - # Format left/right sides of the bar, and format the bar - # later in the remaining space (avoid breaking display) - l_bar_user, r_bar_user = bar_format.split('{bar}') - l_bar = l_bar_user.format(**bar_args) - r_bar = r_bar_user.format(**bar_args) - else: - # Else no progress bar, we can just format and return - return bar_format.format(**bar_args) - - # Formatting progress bar - # space available for bar's display - N_BARS = max(1, ncols - len(l_bar) - len(r_bar)) if ncols \ - else 10 - - # format bar depending on availability of unicode/ascii chars - if ascii: - bar_length, frac_bar_length = divmod( - int(frac * N_BARS * 10), 10) - - bar = '#' * bar_length - frac_bar = chr(48 + frac_bar_length) if frac_bar_length \ - else ' ' - - else: - bar_length, frac_bar_length = divmod(int(frac * N_BARS * 8), 8) - - bar = _unich(0x2588) * bar_length - frac_bar = _unich(0x2590 - frac_bar_length) \ - if frac_bar_length else ' ' - - # whitespace padding - if bar_length < N_BARS: - full_bar = bar + frac_bar + \ - ' ' * max(N_BARS - bar_length - 1, 0) - else: - full_bar = bar + \ - ' ' * max(N_BARS - bar_length, 0) - - # Piece together the bar parts - return l_bar + full_bar + r_bar - - # no total: no progressbar, ETA, just progress stats - else: - return ((prefix + ": ") if prefix else '') + \ - '{0}{1} [{2}, {3}{4}]'.format( - n_fmt, unit, elapsed_str, rate_fmt, - ', ' + postfix if postfix else '') - - def __new__(cls, *args, **kwargs): - # Create a new instance - instance = object.__new__(cls) - # Add to the list of instances - if "_instances" not in cls.__dict__: - cls._instances = WeakSet() - if "_lock" not in cls.__dict__: - cls._lock = TqdmDefaultWriteLock() - with cls._lock: - cls._instances.add(instance) - # Create the monitoring thread - if cls.monitor_interval and (cls.monitor is None or not - cls.monitor.report()): - try: - cls.monitor = TMonitor(cls, cls.monitor_interval) - except Exception as e: # pragma: nocover - warn("tqdm:disabling monitor support" - " (monitor_interval = 0) due to:\n" + str(e), - TqdmMonitorWarning) - cls.monitor_interval = 0 - # Return the instance - return instance - - @classmethod - def _get_free_pos(cls, instance=None): - """Skips specified instance""" - positions = set(abs(inst.pos) for inst in cls._instances - if inst is not instance) - return min(set(range(len(positions) + 1)).difference(positions)) - - @classmethod - def _decr_instances(cls, instance): - """ - Remove from list and reposition other bars - so that newer bars won't overlap previous bars - """ - with cls._lock: - try: - cls._instances.remove(instance) - except KeyError: - if not instance.gui: # pragma: no cover - raise - else: - for inst in cls._instances: - # negative `pos` means fixed - if inst.pos > abs(instance.pos): - inst.pos -= 1 - # TODO: check this doesn't overwrite another fixed bar - # Kill monitor if no instances are left - if not cls._instances and cls.monitor: - try: - cls.monitor.exit() - del cls.monitor - except AttributeError: # pragma: nocover - pass - else: - cls.monitor = None - - @classmethod - def write(cls, s, file=None, end="\n", nolock=False): - """ - Print a message via tqdm (without overlap with bars) - """ - fp = file if file is not None else sys.stdout - with cls.external_write_mode(file=file, nolock=nolock): - # Write the message - fp.write(s) - fp.write(end) - - @classmethod - @contextmanager - def external_write_mode(cls, file=None, nolock=False): - """ - Disable tqdm within context and refresh tqdm when exits. - Useful when writing to standard output stream - """ - fp = file if file is not None else sys.stdout - - if not nolock: - cls._lock.acquire() - # Clear all bars - inst_cleared = [] - for inst in getattr(cls, '_instances', []): - # Clear instance if in the target output file - # or if write output + tqdm output are both either - # sys.stdout or sys.stderr (because both are mixed in terminal) - if inst.fp == fp or all( - f in (sys.stdout, sys.stderr) for f in (fp, inst.fp)): - inst.clear(nolock=True) - inst_cleared.append(inst) - yield - # Force refresh display of bars we cleared - for inst in inst_cleared: - # Avoid race conditions by checking that the instance started - if hasattr(inst, 'start_t'): # pragma: nocover - inst.refresh(nolock=True) - if not nolock: - cls._lock.release() - - @classmethod - def set_lock(cls, lock): - cls._lock = lock - - @classmethod - def get_lock(cls): - return cls._lock - - @classmethod - def pandas(tclass, *targs, **tkwargs): - """ - Registers the given `tqdm` class with - pandas.core. - ( frame.DataFrame - | series.Series - | groupby.DataFrameGroupBy - | groupby.SeriesGroupBy - ).progress_apply - - A new instance will be create every time `progress_apply` is called, - and each instance will automatically close() upon completion. - - Parameters - ---------- - targs, tkwargs : arguments for the tqdm instance - - Examples - -------- - >>> import pandas as pd - >>> import numpy as np - >>> from tqdm import tqdm, tqdm_gui - >>> - >>> df = pd.DataFrame(np.random.randint(0, 100, (100000, 6))) - >>> tqdm.pandas(ncols=50) # can use tqdm_gui, optional kwargs, etc - >>> # Now you can use `progress_apply` instead of `apply` - >>> df.groupby(0).progress_apply(lambda x: x**2) - - References - ---------- - https://stackoverflow.com/questions/18603270/ - progress-indicator-during-pandas-operations-python - """ - from pandas.core.frame import DataFrame - from pandas.core.series import Series - from pandas.core.groupby import DataFrameGroupBy - from pandas.core.groupby import SeriesGroupBy - from pandas.core.groupby import GroupBy - from pandas.core.groupby import PanelGroupBy - from pandas import Panel - - deprecated_t = [tkwargs.pop('deprecated_t', None)] - - def inner_generator(df_function='apply'): - def inner(df, func, *args, **kwargs): - """ - Parameters - ---------- - df : (DataFrame|Series)[GroupBy] - Data (may be grouped). - func : function - To be applied on the (grouped) data. - **kwargs : optional - Transmitted to `df.apply()`. - """ - - # Precompute total iterations - total = getattr(df, 'ngroups', None) - if total is None: # not grouped - if df_function == 'applymap': - total = df.size - elif isinstance(df, Series): - total = len(df) - else: # DataFrame or Panel - axis = kwargs.get('axis', 0) - # when axis=0, total is shape[axis1] - total = df.size // df.shape[axis] - - # Init bar - if deprecated_t[0] is not None: - t = deprecated_t[0] - deprecated_t[0] = None - else: - t = tclass(*targs, total=total, **tkwargs) - - if len(args) > 0: - # *args intentionally not supported (see #244, #299) - TqdmDeprecationWarning( - "Except func, normal arguments are intentionally" + - " not supported by" + - " `(DataFrame|Series|GroupBy).progress_apply`." + - " Use keyword arguments instead.", - fp_write=getattr(t.fp, 'write', sys.stderr.write)) - - # Define bar updating wrapper - def wrapper(*args, **kwargs): - # update tbar correctly - # it seems `pandas apply` calls `func` twice - # on the first column/row to decide whether it can - # take a fast or slow code path; so stop when t.total==t.n - t.update(n=1 if t.total and t.n < t.total else 0) - return func(*args, **kwargs) - - # Apply the provided function (in **kwargs) - # on the df using our wrapper (which provides bar updating) - result = getattr(df, df_function)(wrapper, **kwargs) - - # Close bar and return pandas calculation result - t.close() - return result - - return inner - - # Monkeypatch pandas to provide easy methods - # Enable custom tqdm progress in pandas! - Series.progress_apply = inner_generator() - SeriesGroupBy.progress_apply = inner_generator() - Series.progress_map = inner_generator('map') - SeriesGroupBy.progress_map = inner_generator('map') - - DataFrame.progress_apply = inner_generator() - DataFrameGroupBy.progress_apply = inner_generator() - DataFrame.progress_applymap = inner_generator('applymap') - - Panel.progress_apply = inner_generator() - PanelGroupBy.progress_apply = inner_generator() - - GroupBy.progress_apply = inner_generator() - GroupBy.progress_aggregate = inner_generator('aggregate') - GroupBy.progress_transform = inner_generator('transform') - - def __init__(self, iterable=None, desc=None, total=None, leave=True, - file=None, ncols=None, mininterval=0.1, maxinterval=10.0, - miniters=None, ascii=None, disable=False, unit='it', - unit_scale=False, dynamic_ncols=False, smoothing=0.3, - bar_format=None, initial=0, position=None, postfix=None, - unit_divisor=1000, gui=False, **kwargs): - """ - Parameters - ---------- - iterable : iterable, optional - Iterable to decorate with a progressbar. - Leave blank to manually manage the updates. - desc : str, optional - Prefix for the progressbar. - total : int, optional - The number of expected iterations. If unspecified, - len(iterable) is used if possible. As a last resort, only basic - progress statistics are displayed (no ETA, no progressbar). - If `gui` is True and this parameter needs subsequent updating, - specify an initial arbitrary large positive integer, - e.g. int(9e9). - leave : bool, optional - If [default: True], keeps all traces of the progressbar - upon termination of iteration. - file : `io.TextIOWrapper` or `io.StringIO`, optional - Specifies where to output the progress messages - (default: sys.stderr). Uses `file.write(str)` and `file.flush()` - methods. - ncols : int, optional - The width of the entire output message. If specified, - dynamically resizes the progressbar to stay within this bound. - If unspecified, attempts to use environment width. The - fallback is a meter width of 10 and no limit for the counter and - statistics. If 0, will not print any meter (only stats). - mininterval : float, optional - Minimum progress display update interval, in seconds [default: 0.1]. - maxinterval : float, optional - Maximum progress display update interval, in seconds [default: 10]. - Automatically adjusts `miniters` to correspond to `mininterval` - after long display update lag. Only works if `dynamic_miniters` - or monitor thread is enabled. - miniters : int, optional - Minimum progress display update interval, in iterations. - If 0 and `dynamic_miniters`, will automatically adjust to equal - `mininterval` (more CPU efficient, good for tight loops). - If > 0, will skip display of specified number of iterations. - Tweak this and `mininterval` to get very efficient loops. - If your progress is erratic with both fast and slow iterations - (network, skipping items, etc) you should set miniters=1. - ascii : bool, optional - If unspecified or False, use unicode (smooth blocks) to fill - the meter. The fallback is to use ASCII characters `1-9 #`. - disable : bool, optional - Whether to disable the entire progressbar wrapper - [default: False]. If set to None, disable on non-TTY. - unit : str, optional - String that will be used to define the unit of each iteration - [default: it]. - unit_scale : bool or int or float, optional - If 1 or True, the number of iterations will be reduced/scaled - automatically and a metric prefix following the - International System of Units standard will be added - (kilo, mega, etc.) [default: False]. If any other non-zero - number, will scale `total` and `n`. - dynamic_ncols : bool, optional - If set, constantly alters `ncols` to the environment (allowing - for window resizes) [default: False]. - smoothing : float, optional - Exponential moving average smoothing factor for speed estimates - (ignored in GUI mode). Ranges from 0 (average speed) to 1 - (current/instantaneous speed) [default: 0.3]. - bar_format : str, optional - Specify a custom bar string formatting. May impact performance. - If unspecified, will use '{l_bar}{bar}{r_bar}', where l_bar is - '{desc}: {percentage:3.0f}%|' and r_bar is - '| {n_fmt}/{total_fmt} [{elapsed}<{remaining}, {rate_fmt}]' - Possible vars: bar, n, n_fmt, total, total_fmt, percentage, - rate, rate_fmt, elapsed, remaining, l_bar, r_bar, desc. - Note that a trailing ": " is automatically removed after {desc} - if the latter is empty. - initial : int, optional - The initial counter value. Useful when restarting a progress - bar [default: 0]. - position : int, optional - Specify the line offset to print this bar (starting from 0) - Automatic if unspecified. - Useful to manage multiple bars at once (eg, from threads). - postfix : dict, optional - Specify additional stats to display at the end of the bar. - Note: postfix is a dict ({'key': value} pairs) for this method, - not a string. - unit_divisor : float, optional - [default: 1000], ignored unless `unit_scale` is True. - gui : bool, optional - WARNING: internal parameter - do not use. - Use tqdm_gui(...) instead. If set, will attempt to use - matplotlib animations for a graphical output [default: False]. - - Returns - ------- - out : decorated iterator. - """ - - if file is None: - file = sys.stderr - - if disable is None and hasattr(file, "isatty") and not file.isatty(): - disable = True - - if disable: - self.iterable = iterable - self.disable = disable - self.pos = self._get_free_pos(self) - self._instances.remove(self) - self.n = initial - return - - if kwargs: - self.disable = True - self.pos = self._get_free_pos(self) - self._instances.remove(self) - raise (TqdmDeprecationWarning("""\ -`nested` is deprecated and automated. Use position instead for manual control. -""", fp_write=getattr(file, 'write', sys.stderr.write)) if "nested" in kwargs - else TqdmKeyError("Unknown argument(s): " + str(kwargs))) - - # Preprocess the arguments - if total is None and iterable is not None: - try: - total = len(iterable) - except (TypeError, AttributeError): - total = None - - if ((ncols is None) and (file in (sys.stderr, sys.stdout))) or \ - dynamic_ncols: # pragma: no cover - if dynamic_ncols: - dynamic_ncols = _environ_cols_wrapper() - if dynamic_ncols: - ncols = dynamic_ncols(file) - # elif ncols is not None: - # ncols = 79 - else: - _dynamic_ncols = _environ_cols_wrapper() - if _dynamic_ncols: - ncols = _dynamic_ncols(file) - # else: - # ncols = 79 - - if miniters is None: - miniters = 0 - dynamic_miniters = True - else: - dynamic_miniters = False - - if mininterval is None: - mininterval = 0 - - if maxinterval is None: - maxinterval = 0 - - if ascii is None: - ascii = not _supports_unicode(file) - - if bar_format and not ascii: - # Convert bar format into unicode since terminal uses unicode - bar_format = _unicode(bar_format) - - if smoothing is None: - smoothing = 0 - - # Store the arguments - self.iterable = iterable - self.desc = desc or '' - self.total = total - self.leave = leave - self.fp = file - self.ncols = ncols - self.mininterval = mininterval - self.maxinterval = maxinterval - self.miniters = miniters - self.dynamic_miniters = dynamic_miniters - self.ascii = ascii - self.disable = disable - self.unit = unit - self.unit_scale = unit_scale - self.unit_divisor = unit_divisor - self.gui = gui - self.dynamic_ncols = dynamic_ncols - self.smoothing = smoothing - self.avg_time = None - self._time = time - self.bar_format = bar_format - self.postfix = None - if postfix: - self.set_postfix(refresh=False, **postfix) - - # Init the iterations counters - self.last_print_n = initial - self.n = initial - - # if nested, at initial sp() call we replace '\r' by '\n' to - # not overwrite the outer progress bar - if position is None: - self.pos = self._get_free_pos(self) - else: # mark fixed positions as negative - self.pos = -position - - if not gui: - # Initialize the screen printer - self.sp = self.status_printer(self.fp) - with self._lock: - if self.pos: - self.moveto(abs(self.pos)) - self.sp(self.__repr__(elapsed=0)) - if self.pos: - self.moveto(-abs(self.pos)) - - # Init the time counter - self.last_print_t = self._time() - # NB: Avoid race conditions by setting start_t at the very end of init - self.start_t = self.last_print_t - - def __len__(self): - return self.total if self.iterable is None else \ - (self.iterable.shape[0] if hasattr(self.iterable, "shape") - else len(self.iterable) if hasattr(self.iterable, "__len__") - else self.total) - - def __enter__(self): - return self - - def __exit__(self, *exc): - self.close() - return False - - def __del__(self): - self.close() - - def __repr__(self, elapsed=None): - return self.format_meter( - self.n, self.total, - elapsed if elapsed is not None else self._time() - self.start_t, - self.dynamic_ncols(self.fp) if self.dynamic_ncols else self.ncols, - self.desc, self.ascii, self.unit, - self.unit_scale, 1 / self.avg_time if self.avg_time else None, - self.bar_format, self.postfix, self.unit_divisor) - - def __lt__(self, other): - return abs(self.pos) < abs(other.pos) - - def __le__(self, other): - return (self < other) or (self == other) - - def __eq__(self, other): - return abs(self.pos) == abs(other.pos) - - def __ne__(self, other): - return not (self == other) - - def __gt__(self, other): - return not (self <= other) - - def __ge__(self, other): - return not (self < other) - - def __hash__(self): - return id(self) - - def __iter__(self): - """Backward-compatibility to use: for x in tqdm(iterable)""" - - # Inlining instance variables as locals (speed optimisation) - iterable = self.iterable - - # If the bar is disabled, then just walk the iterable - # (note: keep this check outside the loop for performance) - if self.disable: - for obj in iterable: - yield obj - else: - mininterval = self.mininterval - maxinterval = self.maxinterval - miniters = self.miniters - dynamic_miniters = self.dynamic_miniters - last_print_t = self.last_print_t - last_print_n = self.last_print_n - n = self.n - smoothing = self.smoothing - avg_time = self.avg_time - _time = self._time - - try: - sp = self.sp - except AttributeError: - raise TqdmDeprecationWarning("""\ -Please use `tqdm_gui(...)` instead of `tqdm(..., gui=True)` -""", fp_write=getattr(self.fp, 'write', sys.stderr.write)) - - for obj in iterable: - yield obj - # Update and possibly print the progressbar. - # Note: does not call self.update(1) for speed optimisation. - n += 1 - # check counter first to avoid calls to time() - if n - last_print_n >= self.miniters: - miniters = self.miniters # watch monitoring thread changes - delta_t = _time() - last_print_t - if delta_t >= mininterval: - cur_t = _time() - delta_it = n - last_print_n - # EMA (not just overall average) - if smoothing and delta_t and delta_it: - avg_time = delta_t / delta_it \ - if avg_time is None \ - else smoothing * delta_t / delta_it + \ - (1 - smoothing) * avg_time - - self.n = n - with self._lock: - if self.pos: - self.moveto(abs(self.pos)) - # Print bar update - sp(self.__repr__()) - if self.pos: - self.moveto(-abs(self.pos)) - - # If no `miniters` was specified, adjust automatically - # to the max iteration rate seen so far between 2 prints - if dynamic_miniters: - if maxinterval and delta_t >= maxinterval: - # Adjust miniters to time interval by rule of 3 - if mininterval: - # Set miniters to correspond to mininterval - miniters = delta_it * mininterval / delta_t - else: - # Set miniters to correspond to maxinterval - miniters = delta_it * maxinterval / delta_t - elif smoothing: - # EMA-weight miniters to converge - # towards the timeframe of mininterval - miniters = smoothing * delta_it * \ - (mininterval / delta_t - if mininterval and delta_t else 1) + \ - (1 - smoothing) * miniters - else: - # Maximum nb of iterations between 2 prints - miniters = max(miniters, delta_it) - - # Store old values for next call - self.n = self.last_print_n = last_print_n = n - self.last_print_t = last_print_t = cur_t - self.miniters = miniters - - # Closing the progress bar. - # Update some internal variables for close(). - self.last_print_n = last_print_n - self.n = n - self.miniters = miniters - self.close() - - def update(self, n=1): - """ - Manually update the progress bar, useful for streams - such as reading files. - E.g.: - >>> t = tqdm(total=filesize) # Initialise - >>> for current_buffer in stream: - ... ... - ... t.update(len(current_buffer)) - >>> t.close() - The last line is highly recommended, but possibly not necessary if - `t.update()` will be called in such a way that `filesize` will be - exactly reached and printed. - - Parameters - ---------- - n : int, optional - Increment to add to the internal counter of iterations - [default: 1]. - """ - # N.B.: see __iter__() for more comments. - if self.disable: - return - - if n < 0: - raise ValueError("n ({0}) cannot be negative".format(n)) - self.n += n - - # check counter first to reduce calls to time() - if self.n - self.last_print_n >= self.miniters: - delta_t = self._time() - self.last_print_t - if delta_t >= self.mininterval: - cur_t = self._time() - delta_it = self.n - self.last_print_n # >= n - # elapsed = cur_t - self.start_t - # EMA (not just overall average) - if self.smoothing and delta_t and delta_it: - self.avg_time = delta_t / delta_it \ - if self.avg_time is None \ - else self.smoothing * delta_t / delta_it + \ - (1 - self.smoothing) * self.avg_time - - if not hasattr(self, "sp"): - raise TqdmDeprecationWarning("""\ -Please use `tqdm_gui(...)` instead of `tqdm(..., gui=True)` -""", fp_write=getattr(self.fp, 'write', sys.stderr.write)) - - with self._lock: - if self.pos: - self.moveto(abs(self.pos)) - - # Print bar update - self.sp(self.__repr__()) - - if self.pos: - self.moveto(-abs(self.pos)) - - # If no `miniters` was specified, adjust automatically to the - # maximum iteration rate seen so far between two prints. - # e.g.: After running `tqdm.update(5)`, subsequent - # calls to `tqdm.update()` will only cause an update after - # at least 5 more iterations. - if self.dynamic_miniters: - if self.maxinterval and delta_t >= self.maxinterval: - if self.mininterval: - self.miniters = delta_it * self.mininterval \ - / delta_t - else: - self.miniters = delta_it * self.maxinterval \ - / delta_t - elif self.smoothing: - self.miniters = self.smoothing * delta_it * \ - (self.mininterval / delta_t - if self.mininterval and delta_t - else 1) + \ - (1 - self.smoothing) * self.miniters - else: - self.miniters = max(self.miniters, delta_it) - - # Store old values for next call - self.last_print_n = self.n - self.last_print_t = cur_t - - def close(self): - """ - Cleanup and (if leave=False) close the progressbar. - """ - if self.disable: - return - - # Prevent multiple closures - self.disable = True - - # decrement instance pos and remove from internal set - pos = abs(self.pos) - self._decr_instances(self) - - # GUI mode - if not hasattr(self, "sp"): - return - - # annoyingly, _supports_unicode isn't good enough - def fp_write(s): - self.fp.write(_unicode(s)) - - try: - fp_write('') - except ValueError as e: - if 'closed' in str(e): - return - raise # pragma: no cover - - with self._lock: - if pos: - self.moveto(pos) - - if self.leave: - if self.last_print_n < self.n: - # stats for overall rate (no weighted average) - self.avg_time = None - self.sp(self.__repr__()) - if pos: - self.moveto(-pos) - else: - fp_write('\n') - else: - self.sp('') # clear up last bar - if pos: - self.moveto(-pos) - else: - fp_write('\r') - - def unpause(self): - """ - Restart tqdm timer from last print time. - """ - cur_t = self._time() - self.start_t += cur_t - self.last_print_t - self.last_print_t = cur_t - - def set_description(self, desc=None, refresh=True): - """ - Set/modify description of the progress bar. - - Parameters - ---------- - desc : str, optional - refresh : bool, optional - Forces refresh [default: True]. - """ - self.desc = desc + ': ' if desc else '' - if refresh: - self.refresh() - - def set_description_str(self, desc=None, refresh=True): - """ - Set/modify description without ': ' appended. - """ - self.desc = desc or '' - if refresh: - self.refresh() - - def set_postfix(self, ordered_dict=None, refresh=True, **kwargs): - """ - Set/modify postfix (additional stats) - with automatic formatting based on datatype. - - Parameters - ---------- - ordered_dict : dict or OrderedDict, optional - refresh : bool, optional - Forces refresh [default: True]. - kwargs : dict, optional - """ - # Sort in alphabetical order to be more deterministic - postfix = _OrderedDict([] if ordered_dict is None else ordered_dict) - for key in sorted(kwargs.keys()): - postfix[key] = kwargs[key] - # Preprocess stats according to datatype - for key in postfix.keys(): - # Number: limit the length of the string - if isinstance(postfix[key], Number): - postfix[key] = '{0:2.3g}'.format(postfix[key]) - # Else for any other type, try to get the string conversion - elif not isinstance(postfix[key], _basestring): - postfix[key] = str(postfix[key]) - # Else if it's a string, don't need to preprocess anything - # Stitch together to get the final postfix - self.postfix = ', '.join(key + '=' + postfix[key].strip() - for key in postfix.keys()) - if refresh: - self.refresh() - - def set_postfix_str(self, s='', refresh=True): - """ - Postfix without dictionary expansion, similar to prefix handling. - """ - self.postfix = str(s) - if refresh: - self.refresh() - - def moveto(self, n): - self.fp.write(_unicode('\n' * n + _term_move_up() * -n)) - self.fp.flush() - - def clear(self, nolock=False): - """ - Clear current bar display - """ - if self.disable: - return - - if not nolock: - self._lock.acquire() - self.moveto(abs(self.pos)) - self.sp('') - self.fp.write('\r') # place cursor back at the beginning of line - self.moveto(-abs(self.pos)) - if not nolock: - self._lock.release() - - def refresh(self, nolock=False): - """ - Force refresh the display of this bar - """ - if self.disable: - return - - if not nolock: - self._lock.acquire() - self.moveto(abs(self.pos)) - self.sp(self.__repr__()) - self.moveto(-abs(self.pos)) - if not nolock: - self._lock.release() - - -def trange(*args, **kwargs): - """ - A shortcut for tqdm(xrange(*args), **kwargs). - On Python3+ range is used instead of xrange. - """ - return tqdm(_range(*args), **kwargs) diff --git a/lib/tqdm/_tqdm_gui.py b/lib/tqdm/_tqdm_gui.py deleted file mode 100644 index 13420ef7..00000000 --- a/lib/tqdm/_tqdm_gui.py +++ /dev/null @@ -1,351 +0,0 @@ -""" -GUI progressbar decorator for iterators. -Includes a default (x)range iterator printing to stderr. - -Usage: - >>> from tqdm_gui import tgrange[, tqdm_gui] - >>> for i in tgrange(10): #same as: for i in tqdm_gui(xrange(10)) - ... ... -""" -# future division is important to divide integers and get as -# a result precise floating numbers (instead of truncated int) -from __future__ import division, absolute_import -# import compatibility functions and utilities -# import sys -from time import time -from ._utils import _range -# to inherit from the tqdm class -from ._tqdm import tqdm, TqdmExperimentalWarning -from warnings import warn - - -__author__ = {"github.com/": ["casperdcl", "lrq3000"]} -__all__ = ['tqdm_gui', 'tgrange'] - - -class tqdm_gui(tqdm): # pragma: no cover - """ - Experimental GUI version of tqdm! - """ - - # TODO: @classmethod: write() on GUI? - - def __init__(self, *args, **kwargs): - import matplotlib as mpl - import matplotlib.pyplot as plt - from collections import deque - kwargs['gui'] = True - - super(tqdm_gui, self).__init__(*args, **kwargs) - - # Initialize the GUI display - if self.disable or not kwargs['gui']: - return - - warn('GUI is experimental/alpha', TqdmExperimentalWarning) - self.mpl = mpl - self.plt = plt - self.sp = None - - # Remember if external environment uses toolbars - self.toolbar = self.mpl.rcParams['toolbar'] - self.mpl.rcParams['toolbar'] = 'None' - - self.mininterval = max(self.mininterval, 0.5) - self.fig, ax = plt.subplots(figsize=(9, 2.2)) - # self.fig.subplots_adjust(bottom=0.2) - if self.total: - self.xdata = [] - self.ydata = [] - self.zdata = [] - else: - self.xdata = deque([]) - self.ydata = deque([]) - self.zdata = deque([]) - self.line1, = ax.plot(self.xdata, self.ydata, color='b') - self.line2, = ax.plot(self.xdata, self.zdata, color='k') - ax.set_ylim(0, 0.001) - if self.total: - ax.set_xlim(0, 100) - ax.set_xlabel('percent') - self.fig.legend((self.line1, self.line2), ('cur', 'est'), - loc='center right') - # progressbar - self.hspan = plt.axhspan(0, 0.001, - xmin=0, xmax=0, color='g') - else: - # ax.set_xlim(-60, 0) - ax.set_xlim(0, 60) - ax.invert_xaxis() - ax.set_xlabel('seconds') - ax.legend(('cur', 'est'), loc='lower left') - ax.grid() - # ax.set_xlabel('seconds') - ax.set_ylabel((self.unit if self.unit else 'it') + '/s') - if self.unit_scale: - plt.ticklabel_format(style='sci', axis='y', - scilimits=(0, 0)) - ax.yaxis.get_offset_text().set_x(-0.15) - - # Remember if external environment is interactive - self.wasion = plt.isinteractive() - plt.ion() - self.ax = ax - - def __iter__(self): - # TODO: somehow allow the following: - # if not self.gui: - # return super(tqdm_gui, self).__iter__() - iterable = self.iterable - if self.disable: - for obj in iterable: - yield obj - return - - # ncols = self.ncols - mininterval = self.mininterval - maxinterval = self.maxinterval - miniters = self.miniters - dynamic_miniters = self.dynamic_miniters - unit = self.unit - unit_scale = self.unit_scale - ascii = self.ascii - start_t = self.start_t - last_print_t = self.last_print_t - last_print_n = self.last_print_n - n = self.n - # dynamic_ncols = self.dynamic_ncols - smoothing = self.smoothing - avg_time = self.avg_time - bar_format = self.bar_format - - plt = self.plt - ax = self.ax - xdata = self.xdata - ydata = self.ydata - zdata = self.zdata - line1 = self.line1 - line2 = self.line2 - - for obj in iterable: - yield obj - # Update and print the progressbar. - # Note: does not call self.update(1) for speed optimisation. - n += 1 - delta_it = n - last_print_n - # check the counter first (avoid calls to time()) - if delta_it >= miniters: - cur_t = time() - delta_t = cur_t - last_print_t - if delta_t >= mininterval: - elapsed = cur_t - start_t - # EMA (not just overall average) - if smoothing and delta_t: - avg_time = delta_t / delta_it \ - if avg_time is None \ - else smoothing * delta_t / delta_it + \ - (1 - smoothing) * avg_time - - # Inline due to multiple calls - total = self.total - # instantaneous rate - y = delta_it / delta_t - # overall rate - z = n / elapsed - # update line data - xdata.append(n * 100.0 / total if total else cur_t) - ydata.append(y) - zdata.append(z) - - # Discard old values - # xmin, xmax = ax.get_xlim() - # if (not total) and elapsed > xmin * 1.1: - if (not total) and elapsed > 66: - xdata.popleft() - ydata.popleft() - zdata.popleft() - - ymin, ymax = ax.get_ylim() - if y > ymax or z > ymax: - ymax = 1.1 * y - ax.set_ylim(ymin, ymax) - ax.figure.canvas.draw() - - if total: - line1.set_data(xdata, ydata) - line2.set_data(xdata, zdata) - try: - poly_lims = self.hspan.get_xy() - except AttributeError: - self.hspan = plt.axhspan(0, 0.001, xmin=0, - xmax=0, color='g') - poly_lims = self.hspan.get_xy() - poly_lims[0, 1] = ymin - poly_lims[1, 1] = ymax - poly_lims[2] = [n / total, ymax] - poly_lims[3] = [poly_lims[2, 0], ymin] - if len(poly_lims) > 4: - poly_lims[4, 1] = ymin - self.hspan.set_xy(poly_lims) - else: - t_ago = [cur_t - i for i in xdata] - line1.set_data(t_ago, ydata) - line2.set_data(t_ago, zdata) - - ax.set_title(self.format_meter( - n, total, elapsed, 0, - self.desc, ascii, unit, unit_scale, - 1 / avg_time if avg_time else None, bar_format), - fontname="DejaVu Sans Mono", fontsize=11) - plt.pause(1e-9) - - # If no `miniters` was specified, adjust automatically - # to the maximum iteration rate seen so far. - if dynamic_miniters: - if maxinterval and delta_t > maxinterval: - # Set miniters to correspond to maxinterval - miniters = delta_it * maxinterval / delta_t - elif mininterval and delta_t: - # EMA-weight miniters to converge - # towards the timeframe of mininterval - miniters = smoothing * delta_it * mininterval \ - / delta_t + (1 - smoothing) * miniters - else: - miniters = smoothing * delta_it + \ - (1 - smoothing) * miniters - - # Store old values for next call - last_print_n = n - last_print_t = cur_t - - # Closing the progress bar. - # Update some internal variables for close(). - self.last_print_n = last_print_n - self.n = n - self.close() - - def update(self, n=1): - # if not self.gui: - # return super(tqdm_gui, self).close() - if self.disable: - return - - if n < 0: - n = 1 - self.n += n - - delta_it = self.n - self.last_print_n # should be n? - if delta_it >= self.miniters: - # We check the counter first, to reduce the overhead of time() - cur_t = time() - delta_t = cur_t - self.last_print_t - if delta_t >= self.mininterval: - elapsed = cur_t - self.start_t - # EMA (not just overall average) - if self.smoothing and delta_t: - self.avg_time = delta_t / delta_it \ - if self.avg_time is None \ - else self.smoothing * delta_t / delta_it + \ - (1 - self.smoothing) * self.avg_time - - # Inline due to multiple calls - total = self.total - ax = self.ax - - # instantaneous rate - y = delta_it / delta_t - # smoothed rate - z = self.n / elapsed - # update line data - self.xdata.append(self.n * 100.0 / total - if total else cur_t) - self.ydata.append(y) - self.zdata.append(z) - - # Discard old values - if (not total) and elapsed > 66: - self.xdata.popleft() - self.ydata.popleft() - self.zdata.popleft() - - ymin, ymax = ax.get_ylim() - if y > ymax or z > ymax: - ymax = 1.1 * y - ax.set_ylim(ymin, ymax) - ax.figure.canvas.draw() - - if total: - self.line1.set_data(self.xdata, self.ydata) - self.line2.set_data(self.xdata, self.zdata) - try: - poly_lims = self.hspan.get_xy() - except AttributeError: - self.hspan = self.plt.axhspan(0, 0.001, xmin=0, - xmax=0, color='g') - poly_lims = self.hspan.get_xy() - poly_lims[0, 1] = ymin - poly_lims[1, 1] = ymax - poly_lims[2] = [self.n / total, ymax] - poly_lims[3] = [poly_lims[2, 0], ymin] - if len(poly_lims) > 4: - poly_lims[4, 1] = ymin - self.hspan.set_xy(poly_lims) - else: - t_ago = [cur_t - i for i in self.xdata] - self.line1.set_data(t_ago, self.ydata) - self.line2.set_data(t_ago, self.zdata) - - ax.set_title(self.format_meter( - self.n, total, elapsed, 0, - self.desc, self.ascii, self.unit, self.unit_scale, - 1 / self.avg_time if self.avg_time else None, - self.bar_format), - fontname="DejaVu Sans Mono", fontsize=11) - self.plt.pause(1e-9) - - # If no `miniters` was specified, adjust automatically to the - # maximum iteration rate seen so far. - # e.g.: After running `tqdm.update(5)`, subsequent - # calls to `tqdm.update()` will only cause an update after - # at least 5 more iterations. - if self.dynamic_miniters: - if self.maxinterval and delta_t > self.maxinterval: - self.miniters = self.miniters * self.maxinterval \ - / delta_t - elif self.mininterval and delta_t: - self.miniters = self.smoothing * delta_it \ - * self.mininterval / delta_t + \ - (1 - self.smoothing) * self.miniters - else: - self.miniters = self.smoothing * delta_it + \ - (1 - self.smoothing) * self.miniters - - # Store old values for next call - self.last_print_n = self.n - self.last_print_t = cur_t - - def close(self): - # if not self.gui: - # return super(tqdm_gui, self).close() - if self.disable: - return - - self.disable = True - - self._instances.remove(self) - - # Restore toolbars - self.mpl.rcParams['toolbar'] = self.toolbar - # Return to non-interactive mode - if not self.wasion: - self.plt.ioff() - if not self.leave: - self.plt.close(self.fig) - - -def tgrange(*args, **kwargs): - """ - A shortcut for tqdm_gui(xrange(*args), **kwargs). - On Python3+ range is used instead of xrange. - """ - return tqdm_gui(_range(*args), **kwargs) diff --git a/lib/tqdm/_tqdm_notebook.py b/lib/tqdm/_tqdm_notebook.py deleted file mode 100644 index 5189d8c3..00000000 --- a/lib/tqdm/_tqdm_notebook.py +++ /dev/null @@ -1,236 +0,0 @@ -""" -IPython/Jupyter Notebook progressbar decorator for iterators. -Includes a default (x)range iterator printing to stderr. - -Usage: - >>> from tqdm_notebook import tnrange[, tqdm_notebook] - >>> for i in tnrange(10): #same as: for i in tqdm_notebook(xrange(10)) - ... ... -""" -# future division is important to divide integers and get as -# a result precise floating numbers (instead of truncated int) -from __future__ import division, absolute_import -# import compatibility functions and utilities -import sys -from ._utils import _range -# to inherit from the tqdm class -from ._tqdm import tqdm - - -if True: # pragma: no cover - # import IPython/Jupyter base widget and display utilities - try: # IPython 4.x - import ipywidgets - IPY = 4 - except ImportError: # IPython 3.x / 2.x - IPY = 32 - import warnings - with warnings.catch_warnings(): - ipy_deprecation_msg = "The `IPython.html` package" \ - " has been deprecated" - warnings.filterwarnings('error', - message=".*" + ipy_deprecation_msg + ".*") - try: - import IPython.html.widgets as ipywidgets - except Warning as e: - if ipy_deprecation_msg not in str(e): - raise - warnings.simplefilter('ignore') - try: - import IPython.html.widgets as ipywidgets # NOQA - except ImportError: - pass - except ImportError: - pass - - try: # IPython 4.x / 3.x - if IPY == 32: - from IPython.html.widgets import IntProgress, HBox, HTML - IPY = 3 - else: - from ipywidgets import IntProgress, HBox, HTML - except ImportError: - try: # IPython 2.x - from IPython.html.widgets import IntProgressWidget as IntProgress - from IPython.html.widgets import ContainerWidget as HBox - from IPython.html.widgets import HTML - IPY = 2 - except ImportError: - IPY = 0 - - try: - from IPython.display import display # , clear_output - except ImportError: - pass - - # HTML encoding - try: # Py3 - from html import escape - except ImportError: # Py2 - from cgi import escape - - -__author__ = {"github.com/": ["lrq3000", "casperdcl", "alexanderkuk"]} -__all__ = ['tqdm_notebook', 'tnrange'] - - -class tqdm_notebook(tqdm): - """ - Experimental IPython/Jupyter Notebook widget using tqdm! - """ - - @staticmethod - def status_printer(_, total=None, desc=None): - """ - Manage the printing of an IPython/Jupyter Notebook progress bar widget. - """ - # Fallback to text bar if there's no total - # DEPRECATED: replaced with an 'info' style bar - # if not total: - # return super(tqdm_notebook, tqdm_notebook).status_printer(file) - - # fp = file - - # Prepare IPython progress bar - if total: - pbar = IntProgress(min=0, max=total) - else: # No total? Show info style bar with no progress tqdm status - pbar = IntProgress(min=0, max=1) - pbar.value = 1 - pbar.bar_style = 'info' - if desc: - pbar.description = desc - # Prepare status text - ptext = HTML() - # Only way to place text to the right of the bar is to use a container - container = HBox(children=[pbar, ptext]) - display(container) - - def print_status(s='', close=False, bar_style=None, desc=None): - # Note: contrary to native tqdm, s='' does NOT clear bar - # goal is to keep all infos if error happens so user knows - # at which iteration the loop failed. - - # Clear previous output (really necessary?) - # clear_output(wait=1) - - # Get current iteration value from format_meter string - if total: - # n = None - if s: - npos = s.find(r'/|/') # cause we use bar_format=r'{n}|...' - # Check that n can be found in s (else n > total) - if npos >= 0: - n = int(s[:npos]) # get n from string - s = s[npos + 3:] # remove from string - - # Update bar with current n value - if n is not None: - pbar.value = n - - # Print stats - if s: # never clear the bar (signal: s='') - s = s.replace('||', '') # remove inesthetical pipes - s = escape(s) # html escape special characters (like '?') - ptext.value = s - - # Change bar style - if bar_style: - # Hack-ish way to avoid the danger bar_style being overriden by - # success because the bar gets closed after the error... - if not (pbar.bar_style == 'danger' and bar_style == 'success'): - pbar.bar_style = bar_style - - # Special signal to close the bar - if close and pbar.bar_style != 'danger': # hide only if no error - try: - container.close() - except AttributeError: - container.visible = False - - # Update description - if desc: - pbar.description = desc - - return print_status - - def __init__(self, *args, **kwargs): - # Setup default output - if kwargs.get('file', sys.stderr) is sys.stderr: - kwargs['file'] = sys.stdout # avoid the red block in IPython - - # Remove the bar from the printed string, only print stats - if not kwargs.get('bar_format', None): - kwargs['bar_format'] = r'{n}/|/{l_bar}{r_bar}' - - # Initialize parent class + avoid printing by using gui=True - kwargs['gui'] = True - super(tqdm_notebook, self).__init__(*args, **kwargs) - if self.disable or not kwargs['gui']: - return - - # Delete first pbar generated from super() (wrong total and text) - # DEPRECATED by using gui=True - # self.sp('', close=True) - # Replace with IPython progress bar display (with correct total) - self.sp = self.status_printer(self.fp, self.total, self.desc) - self.desc = None # trick to place description before the bar - - # Print initial bar state - if not self.disable: - self.sp(self.__repr__()) # same as self.refresh without clearing - - def __iter__(self, *args, **kwargs): - try: - for obj in super(tqdm_notebook, self).__iter__(*args, **kwargs): - # return super(tqdm...) will not catch exception - yield obj - # NB: except ... [ as ...] breaks IPython async KeyboardInterrupt - except: - self.sp(bar_style='danger') - raise - - def update(self, *args, **kwargs): - try: - super(tqdm_notebook, self).update(*args, **kwargs) - except Exception as exc: - # cannot catch KeyboardInterrupt when using manual tqdm - # as the interrupt will most likely happen on another statement - self.sp(bar_style='danger') - raise exc - - def close(self, *args, **kwargs): - super(tqdm_notebook, self).close(*args, **kwargs) - # If it was not run in a notebook, sp is not assigned, check for it - if hasattr(self, 'sp'): - # Try to detect if there was an error or KeyboardInterrupt - # in manual mode: if n < total, things probably got wrong - if self.total and self.n < self.total: - self.sp(bar_style='danger') - else: - if self.leave: - self.sp(bar_style='success') - else: - self.sp(close=True) - - def moveto(self, *args, **kwargs): - # void -> avoid extraneous `\n` in IPython output cell - return - - def set_description(self, desc=None, **_): - """ - Set/modify description of the progress bar. - - Parameters - ---------- - desc : str, optional - """ - self.sp(desc=desc) - - -def tnrange(*args, **kwargs): - """ - A shortcut for tqdm_notebook(xrange(*args), **kwargs). - On Python3+ range is used instead of xrange. - """ - return tqdm_notebook(_range(*args), **kwargs) diff --git a/lib/tqdm/_tqdm_pandas.py b/lib/tqdm/_tqdm_pandas.py deleted file mode 100644 index 234fafff..00000000 --- a/lib/tqdm/_tqdm_pandas.py +++ /dev/null @@ -1,46 +0,0 @@ -import sys - -__author__ = "github.com/casperdcl" -__all__ = ['tqdm_pandas'] - - -def tqdm_pandas(tclass, *targs, **tkwargs): - """ - Registers the given `tqdm` instance with - `pandas.core.groupby.DataFrameGroupBy.progress_apply`. - It will even close() the `tqdm` instance upon completion. - - Parameters - ---------- - tclass : tqdm class you want to use (eg, tqdm, tqdm_notebook, etc) - targs and tkwargs : arguments for the tqdm instance - - Examples - -------- - >>> import pandas as pd - >>> import numpy as np - >>> from tqdm import tqdm, tqdm_pandas - >>> - >>> df = pd.DataFrame(np.random.randint(0, 100, (100000, 6))) - >>> tqdm_pandas(tqdm, leave=True) # can use tqdm_gui, optional kwargs, etc - >>> # Now you can use `progress_apply` instead of `apply` - >>> df.groupby(0).progress_apply(lambda x: x**2) - - References - ---------- - https://stackoverflow.com/questions/18603270/ - progress-indicator-during-pandas-operations-python - """ - from tqdm import TqdmDeprecationWarning - - if isinstance(tclass, type) or (getattr(tclass, '__name__', '').startswith( - 'tqdm_')): # delayed adapter case - TqdmDeprecationWarning("""\ -Please use `tqdm.pandas(...)` instead of `tqdm_pandas(tqdm, ...)`. -""", fp_write=getattr(tkwargs.get('file', None), 'write', sys.stderr.write)) - tclass.pandas(*targs, **tkwargs) - else: - TqdmDeprecationWarning("""\ -Please use `tqdm.pandas(...)` instead of `tqdm_pandas(tqdm(...))`. -""", fp_write=getattr(tclass.fp, 'write', sys.stderr.write)) - type(tclass).pandas(deprecated_t=tclass) diff --git a/lib/tqdm/_utils.py b/lib/tqdm/_utils.py deleted file mode 100644 index 981a7997..00000000 --- a/lib/tqdm/_utils.py +++ /dev/null @@ -1,215 +0,0 @@ -import os -import subprocess -from platform import system as _curos -CUR_OS = _curos() -IS_WIN = CUR_OS in ['Windows', 'cli'] -IS_NIX = (not IS_WIN) and any( - CUR_OS.startswith(i) for i in - ['CYGWIN', 'MSYS', 'Linux', 'Darwin', 'SunOS', 'FreeBSD', 'NetBSD']) - - -# Py2/3 compat. Empty conditional to avoid coverage -if True: # pragma: no cover - try: - _range = xrange - except NameError: - _range = range - - try: - _unich = unichr - except NameError: - _unich = chr - - try: - _unicode = unicode - except NameError: - _unicode = str - - try: - if IS_WIN: - import colorama - colorama.init() - else: - colorama = None - except ImportError: - colorama = None - - try: - from weakref import WeakSet - except ImportError: - WeakSet = set - - try: - _basestring = basestring - except NameError: - _basestring = str - - try: # py>=2.7,>=3.1 - from collections import OrderedDict as _OrderedDict - except ImportError: - try: # older Python versions with backported ordereddict lib - from ordereddict import OrderedDict as _OrderedDict - except ImportError: # older Python versions without ordereddict lib - # Py2.6,3.0 compat, from PEP 372 - from collections import MutableMapping - - class _OrderedDict(dict, MutableMapping): - # Methods with direct access to underlying attributes - def __init__(self, *args, **kwds): - if len(args) > 1: - raise TypeError('expected at 1 argument, got %d', - len(args)) - if not hasattr(self, '_keys'): - self._keys = [] - self.update(*args, **kwds) - - def clear(self): - del self._keys[:] - dict.clear(self) - - def __setitem__(self, key, value): - if key not in self: - self._keys.append(key) - dict.__setitem__(self, key, value) - - def __delitem__(self, key): - dict.__delitem__(self, key) - self._keys.remove(key) - - def __iter__(self): - return iter(self._keys) - - def __reversed__(self): - return reversed(self._keys) - - def popitem(self): - if not self: - raise KeyError - key = self._keys.pop() - value = dict.pop(self, key) - return key, value - - def __reduce__(self): - items = [[k, self[k]] for k in self] - inst_dict = vars(self).copy() - inst_dict.pop('_keys', None) - return self.__class__, (items,), inst_dict - - # Methods with indirect access via the above methods - setdefault = MutableMapping.setdefault - update = MutableMapping.update - pop = MutableMapping.pop - keys = MutableMapping.keys - values = MutableMapping.values - items = MutableMapping.items - - def __repr__(self): - pairs = ', '.join(map('%r: %r'.__mod__, self.items())) - return '%s({%s})' % (self.__class__.__name__, pairs) - - def copy(self): - return self.__class__(self) - - @classmethod - def fromkeys(cls, iterable, value=None): - d = cls() - for key in iterable: - d[key] = value - return d - - -def _is_utf(encoding): - try: - u'\u2588\u2589'.encode(encoding) - except UnicodeEncodeError: # pragma: no cover - return False - except Exception: # pragma: no cover - try: - return encoding.lower().startswith('utf-') or ('U8' == encoding) - except: - return False - else: - return True - - -def _supports_unicode(fp): - try: - return _is_utf(fp.encoding) - except AttributeError: - return False - - -def _environ_cols_wrapper(): # pragma: no cover - """ - Return a function which gets width and height of console - (linux,osx,windows,cygwin). - """ - _environ_cols = None - if IS_WIN: - _environ_cols = _environ_cols_windows - if _environ_cols is None: - _environ_cols = _environ_cols_tput - if IS_NIX: - _environ_cols = _environ_cols_linux - return _environ_cols - - -def _environ_cols_windows(fp): # pragma: no cover - try: - from ctypes import windll, create_string_buffer - import struct - from sys import stdin, stdout - - io_handle = -12 # assume stderr - if fp == stdin: - io_handle = -10 - elif fp == stdout: - io_handle = -11 - - h = windll.kernel32.GetStdHandle(io_handle) - csbi = create_string_buffer(22) - res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) - if res: - (_bufx, _bufy, _curx, _cury, _wattr, left, _top, right, _bottom, - _maxx, _maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) - # nlines = bottom - top + 1 - return right - left # +1 - except: - pass - return None - - -def _environ_cols_tput(*_): # pragma: no cover - """cygwin xterm (windows)""" - try: - import shlex - cols = int(subprocess.check_call(shlex.split('tput cols'))) - # rows = int(subprocess.check_call(shlex.split('tput lines'))) - return cols - except: - pass - return None - - -def _environ_cols_linux(fp): # pragma: no cover - - try: - from termios import TIOCGWINSZ - from fcntl import ioctl - from array import array - except ImportError: - return None - else: - try: - return array('h', ioctl(fp, TIOCGWINSZ, '\0' * 8))[1] - except: - try: - from os.environ import get - except ImportError: - return None - else: - return int(get('COLUMNS', 1)) - 1 - - -def _term_move_up(): # pragma: no cover - return '' if (os.name == 'nt') and (colorama is None) else '\x1b[A' diff --git a/lib/tqdm/_version.py b/lib/tqdm/_version.py deleted file mode 100644 index 45bef8d7..00000000 --- a/lib/tqdm/_version.py +++ /dev/null @@ -1,59 +0,0 @@ -# Definition of the version number -import os -from io import open as io_open - -__all__ = ["__version__"] - -# major, minor, patch, -extra -version_info = 4, 21, 0 - -# Nice string for the version -__version__ = '.'.join(map(str, version_info)) - - -# auto -extra based on commit hash (if not tagged as release) -scriptdir = os.path.dirname(__file__) -gitdir = os.path.abspath(os.path.join(scriptdir, "..", ".git")) -if os.path.isdir(gitdir): # pragma: nocover - extra = None - # Open config file to check if we are in tqdm project - with io_open(os.path.join(gitdir, "config"), 'r') as fh_config: - if 'tqdm' in fh_config.read(): - # Open the HEAD file - with io_open(os.path.join(gitdir, "HEAD"), 'r') as fh_head: - extra = fh_head.readline().strip() - # in a branch => HEAD points to file containing last commit - if 'ref:' in extra: - # reference file path - ref_file = extra[5:] - branch_name = ref_file.rsplit('/', 1)[-1] - - ref_file_path = os.path.abspath(os.path.join(gitdir, ref_file)) - # check that we are in git folder - # (by stripping the git folder from the ref file path) - if os.path.relpath( - ref_file_path, gitdir).replace('\\', '/') != ref_file: - # out of git folder - extra = None - else: - # open the ref file - with io_open(ref_file_path, 'r') as fh_branch: - commit_hash = fh_branch.readline().strip() - extra = commit_hash[:8] - if branch_name != "master": - extra += '.' + branch_name - - # detached HEAD mode, already have commit hash - else: - extra = extra[:8] - - # Append commit hash (and branch) to version string if not tagged - if extra is not None: - try: - with io_open(os.path.join(gitdir, "refs", "tags", - 'v' + __version__)) as fdv: - if fdv.readline().strip()[:8] != extra[:8]: - __version__ += '-' + extra - except Exception as e: - if "No such file" not in str(e): - raise diff --git a/lib/tqdm/tests/tests_main.py b/lib/tqdm/tests/tests_main.py deleted file mode 100644 index f635f7ce..00000000 --- a/lib/tqdm/tests/tests_main.py +++ /dev/null @@ -1,94 +0,0 @@ -import sys -import subprocess -from tqdm import main, TqdmKeyError, TqdmTypeError - -from tests_tqdm import with_setup, pretest, posttest, _range, closing, \ - UnicodeIO, StringIO - - -def _sh(*cmd, **kwargs): - return subprocess.Popen(cmd, stdout=subprocess.PIPE, - **kwargs).communicate()[0].decode('utf-8') - - -# WARNING: this should be the last test as it messes with sys.stdin, argv -@with_setup(pretest, posttest) -def test_main(): - """Test command line pipes""" - ls_out = _sh('ls').replace('\r\n', '\n') - ls = subprocess.Popen('ls', stdout=subprocess.PIPE, - stderr=subprocess.STDOUT) - res = _sh(sys.executable, '-c', 'from tqdm import main; main()', - stdin=ls.stdout, stderr=subprocess.STDOUT) - ls.wait() - - # actual test: - - assert (ls_out in res.replace('\r\n', '\n')) - - # semi-fake test which gets coverage: - _SYS = sys.stdin, sys.argv - - with closing(StringIO()) as sys.stdin: - sys.argv = ['', '--desc', 'Test CLI-delims', - '--ascii', 'True', '--delim', r'\0', '--buf_size', '64'] - sys.stdin.write('\0'.join(map(str, _range(int(1e3))))) - sys.stdin.seek(0) - main() - - IN_DATA_LIST = map(str, _range(int(1e3))) - sys.stdin = IN_DATA_LIST - sys.argv = ['', '--desc', 'Test CLI pipes', - '--ascii', 'True', '--unit_scale', 'True'] - import tqdm.__main__ # NOQA - - IN_DATA = '\0'.join(IN_DATA_LIST) - with closing(StringIO()) as sys.stdin: - sys.stdin.write(IN_DATA) - sys.stdin.seek(0) - sys.argv = ['', '--ascii', '--bytes', '--unit_scale', 'False'] - with closing(UnicodeIO()) as fp: - main(fp=fp) - assert (str(len(IN_DATA)) in fp.getvalue()) - - sys.stdin = IN_DATA_LIST - sys.argv = ['', '-ascii', '--unit_scale', 'False', - '--desc', 'Test CLI errors'] - main() - - sys.argv = ['', '-ascii', '-unit_scale', '--bad_arg_u_ment', 'foo'] - try: - main() - except TqdmKeyError as e: - if 'bad_arg_u_ment' not in str(e): - raise - else: - raise TqdmKeyError('bad_arg_u_ment') - - sys.argv = ['', '-ascii', '-unit_scale', 'invalid_bool_value'] - try: - main() - except TqdmTypeError as e: - if 'invalid_bool_value' not in str(e): - raise - else: - raise TqdmTypeError('invalid_bool_value') - - sys.argv = ['', '-ascii', '--total', 'invalid_int_value'] - try: - main() - except TqdmTypeError as e: - if 'invalid_int_value' not in str(e): - raise - else: - raise TqdmTypeError('invalid_int_value') - - for i in ('-h', '--help', '-v', '--version'): - sys.argv = ['', i] - try: - main() - except SystemExit: - pass - - # clean up - sys.stdin, sys.argv = _SYS diff --git a/lib/tqdm/tests/tests_pandas.py b/lib/tqdm/tests/tests_pandas.py deleted file mode 100644 index 77ec0773..00000000 --- a/lib/tqdm/tests/tests_pandas.py +++ /dev/null @@ -1,207 +0,0 @@ -from nose.plugins.skip import SkipTest - -from tqdm import tqdm -from tests_tqdm import with_setup, pretest, posttest, StringIO, closing - - -@with_setup(pretest, posttest) -def test_pandas_series(): - """Test pandas.Series.progress_apply and .progress_map""" - try: - from numpy.random import randint - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - tqdm.pandas(file=our_file, leave=True, ascii=True) - - series = pd.Series(randint(0, 50, (123,))) - res1 = series.progress_apply(lambda x: x + 10) - res2 = series.apply(lambda x: x + 10) - assert res1.equals(res2) - - res3 = series.progress_map(lambda x: x + 10) - res4 = series.map(lambda x: x + 10) - assert res3.equals(res4) - - expects = ['100%', '123/123'] - for exres in expects: - our_file.seek(0) - if our_file.getvalue().count(exres) < 2: - our_file.seek(0) - raise AssertionError( - "\nExpected:\n{0}\nIn:\n{1}\n".format( - exres + " at least twice.", our_file.read())) - - -@with_setup(pretest, posttest) -def test_pandas_data_frame(): - """Test pandas.DataFrame.progress_apply and .progress_applymap""" - try: - from numpy.random import randint - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - tqdm.pandas(file=our_file, leave=True, ascii=True) - df = pd.DataFrame(randint(0, 50, (100, 200))) - - def task_func(x): - return x + 1 - - # applymap - res1 = df.progress_applymap(task_func) - res2 = df.applymap(task_func) - assert res1.equals(res2) - - # apply - for axis in [0, 1]: - res3 = df.progress_apply(task_func, axis=axis) - res4 = df.apply(task_func, axis=axis) - assert res3.equals(res4) - - our_file.seek(0) - if our_file.read().count('100%') < 3: - our_file.seek(0) - raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format( - '100% at least three times', our_file.read())) - - # apply_map, apply axis=0, apply axis=1 - expects = ['20000/20000', '200/200', '100/100'] - for exres in expects: - our_file.seek(0) - if our_file.getvalue().count(exres) < 1: - our_file.seek(0) - raise AssertionError( - "\nExpected:\n{0}\nIn:\n {1}\n".format( - exres + " at least once.", our_file.read())) - - -@with_setup(pretest, posttest) -def test_pandas_groupby_apply(): - """Test pandas.DataFrame.groupby(...).progress_apply""" - try: - from numpy.random import randint - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - tqdm.pandas(file=our_file, leave=False, ascii=True) - - df = pd.DataFrame(randint(0, 50, (500, 3))) - df.groupby(0).progress_apply(lambda x: None) - - dfs = pd.DataFrame(randint(0, 50, (500, 3)), columns=list('abc')) - dfs.groupby(['a']).progress_apply(lambda x: None) - - our_file.seek(0) - - # don't expect final output since no `leave` and - # high dynamic `miniters` - nexres = '100%|##########|' - if nexres in our_file.read(): - our_file.seek(0) - raise AssertionError("\nDid not expect:\n{0}\nIn:{1}\n".format( - nexres, our_file.read())) - - with closing(StringIO()) as our_file: - tqdm.pandas(file=our_file, leave=True, ascii=True) - - dfs = pd.DataFrame(randint(0, 50, (500, 3)), columns=list('abc')) - dfs.loc[0] = [2, 1, 1] - dfs['d'] = 100 - - expects = ['500/500', '1/1', '4/4', '2/2'] - dfs.groupby(dfs.index).progress_apply(lambda x: None) - dfs.groupby('d').progress_apply(lambda x: None) - dfs.groupby(dfs.columns, axis=1).progress_apply(lambda x: None) - dfs.groupby([2, 2, 1, 1], axis=1).progress_apply(lambda x: None) - - our_file.seek(0) - if our_file.read().count('100%') < 4: - our_file.seek(0) - raise AssertionError("\nExpected:\n{0}\nIn:\n{1}\n".format( - '100% at least four times', our_file.read())) - - for exres in expects: - our_file.seek(0) - if our_file.getvalue().count(exres) < 1: - our_file.seek(0) - raise AssertionError( - "\nExpected:\n{0}\nIn:\n {1}\n".format( - exres + " at least once.", our_file.read())) - - -@with_setup(pretest, posttest) -def test_pandas_leave(): - """Test pandas with `leave=True`""" - try: - from numpy.random import randint - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - df = pd.DataFrame(randint(0, 100, (1000, 6))) - tqdm.pandas(file=our_file, leave=True, ascii=True) - df.groupby(0).progress_apply(lambda x: None) - - our_file.seek(0) - - exres = '100%|##########| 100/100' - if exres not in our_file.read(): - our_file.seek(0) - raise AssertionError( - "\nExpected:\n{0}\nIn:{1}\n".format(exres, our_file.read())) - - -@with_setup(pretest, posttest) -def test_pandas_apply_args_deprecation(): - """Test warning info in - `pandas.Dataframe(Series).progress_apply(func, *args)`""" - try: - from numpy.random import randint - from tqdm import tqdm_pandas - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - tqdm_pandas(tqdm(file=our_file, leave=False, ascii=True, ncols=20)) - df = pd.DataFrame(randint(0, 50, (500, 3))) - df.progress_apply(lambda x: None, 1) # 1 shall cause a warning - # Check deprecation message - res = our_file.getvalue() - assert all([i in res for i in ( - "TqdmDeprecationWarning", "not supported", - "keyword arguments instead")]) - - -@with_setup(pretest, posttest) -def test_pandas_deprecation(): - """Test bar object instance as argument deprecation""" - try: - from numpy.random import randint - from tqdm import tqdm_pandas - import pandas as pd - except ImportError: - raise SkipTest - - with closing(StringIO()) as our_file: - tqdm_pandas(tqdm(file=our_file, leave=False, ascii=True, ncols=20)) - df = pd.DataFrame(randint(0, 50, (500, 3))) - df.groupby(0).progress_apply(lambda x: None) - # Check deprecation message - assert "TqdmDeprecationWarning" in our_file.getvalue() - assert "instead of `tqdm_pandas(tqdm(...))`" in our_file.getvalue() - - with closing(StringIO()) as our_file: - tqdm_pandas(tqdm, file=our_file, leave=False, ascii=True, ncols=20) - df = pd.DataFrame(randint(0, 50, (500, 3))) - df.groupby(0).progress_apply(lambda x: None) - # Check deprecation message - assert "TqdmDeprecationWarning" in our_file.getvalue() - assert "instead of `tqdm_pandas(tqdm, ...)`" in our_file.getvalue() diff --git a/lib/tqdm/tests/tests_perf.py b/lib/tqdm/tests/tests_perf.py deleted file mode 100644 index b0d238ea..00000000 --- a/lib/tqdm/tests/tests_perf.py +++ /dev/null @@ -1,336 +0,0 @@ -from __future__ import print_function, division - -from nose.plugins.skip import SkipTest - -from contextlib import contextmanager - -import sys -from time import sleep, time - -from tqdm import trange -from tqdm import tqdm - -from tests_tqdm import with_setup, pretest, posttest, StringIO, closing, _range - -# Use relative/cpu timer to have reliable timings when there is a sudden load -try: - from time import process_time -except ImportError: - from time import clock - process_time = clock - - -def get_relative_time(prevtime=0): - return process_time() - prevtime - - -def cpu_sleep(t): - """Sleep the given amount of cpu time""" - start = process_time() - while (process_time() - start) < t: - pass - - -def checkCpuTime(sleeptime=0.2): - """Check if cpu time works correctly""" - if checkCpuTime.passed: - return True - # First test that sleeping does not consume cputime - start1 = process_time() - sleep(sleeptime) - t1 = process_time() - start1 - - # secondly check by comparing to cpusleep (where we actually do something) - start2 = process_time() - cpu_sleep(sleeptime) - t2 = process_time() - start2 - - if abs(t1) < 0.0001 and (t1 < t2 / 10): - return True - raise SkipTest - - -checkCpuTime.passed = False - - -@contextmanager -def relative_timer(): - start = process_time() - - def elapser(): - return process_time() - start - - yield lambda: elapser() - spent = process_time() - start - - def elapser(): # NOQA - return spent - - -def retry_on_except(n=3): - def wrapper(fn): - def test_inner(): - for i in range(1, n + 1): - try: - checkCpuTime() - fn() - except SkipTest: - if i >= n: - raise - else: - return - - test_inner.__doc__ = fn.__doc__ - return test_inner - - return wrapper - - -class MockIO(StringIO): - """Wraps StringIO to mock a file with no I/O""" - - def write(self, data): - return - - -def simple_progress(iterable=None, total=None, file=sys.stdout, desc='', - leave=False, miniters=1, mininterval=0.1, width=60): - """Simple progress bar reproducing tqdm's major features""" - n = [0] # use a closure - start_t = [time()] - last_n = [0] - last_t = [0] - if iterable is not None: - total = len(iterable) - - def format_interval(t): - mins, s = divmod(int(t), 60) - h, m = divmod(mins, 60) - if h: - return '{0:d}:{1:02d}:{2:02d}'.format(h, m, s) - else: - return '{0:02d}:{1:02d}'.format(m, s) - - def update_and_print(i=1): - n[0] += i - if (n[0] - last_n[0]) >= miniters: - last_n[0] = n[0] - - if (time() - last_t[0]) >= mininterval: - last_t[0] = time() # last_t[0] == current time - - spent = last_t[0] - start_t[0] - spent_fmt = format_interval(spent) - rate = n[0] / spent if spent > 0 else 0 - if 0.0 < rate < 1.0: - rate_fmt = "%.2fs/it" % (1.0 / rate) - else: - rate_fmt = "%.2fit/s" % rate - - frac = n[0] / total - percentage = int(frac * 100) - eta = (total - n[0]) / rate if rate > 0 else 0 - eta_fmt = format_interval(eta) - - # bar = "#" * int(frac * width) - barfill = " " * int((1.0 - frac) * width) - bar_length, frac_bar_length = divmod(int(frac * width * 10), 10) - bar = '#' * bar_length - frac_bar = chr(48 + frac_bar_length) if frac_bar_length \ - else ' ' - - file.write("\r%s %i%%|%s%s%s| %i/%i [%s<%s, %s]" % - (desc, percentage, bar, frac_bar, barfill, n[0], - total, spent_fmt, eta_fmt, rate_fmt)) - - if n[0] == total and leave: - file.write("\n") - file.flush() - - def update_and_yield(): - for elt in iterable: - yield elt - update_and_print() - - update_and_print(0) - if iterable is not None: - return update_and_yield() - else: - return update_and_print - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_iter_overhead(): - """Test overhead of iteration based tqdm""" - - total = int(1e6) - - with closing(MockIO()) as our_file: - a = 0 - with trange(total, file=our_file) as t: - with relative_timer() as time_tqdm: - for i in t: - a += i - assert (a == (total * total - total) / 2.0) - - a = 0 - with relative_timer() as time_bench: - for i in _range(total): - a += i - our_file.write(a) - - # Compute relative overhead of tqdm against native range() - if time_tqdm() > 9 * time_bench(): - raise AssertionError('trange(%g): %f, range(%g): %f' % - (total, time_tqdm(), total, time_bench())) - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_manual_overhead(): - """Test overhead of manual tqdm""" - - total = int(1e6) - - with closing(MockIO()) as our_file: - with tqdm(total=total * 10, file=our_file, leave=True) as t: - a = 0 - with relative_timer() as time_tqdm: - for i in _range(total): - a += i - t.update(10) - - a = 0 - with relative_timer() as time_bench: - for i in _range(total): - a += i - our_file.write(a) - - # Compute relative overhead of tqdm against native range() - if time_tqdm() > 10 * time_bench(): - raise AssertionError('tqdm(%g): %f, range(%g): %f' % - (total, time_tqdm(), total, time_bench())) - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_iter_overhead_hard(): - """Test overhead of iteration based tqdm (hard)""" - - total = int(1e5) - - with closing(MockIO()) as our_file: - a = 0 - with trange(total, file=our_file, leave=True, miniters=1, - mininterval=0, maxinterval=0) as t: - with relative_timer() as time_tqdm: - for i in t: - a += i - assert (a == (total * total - total) / 2.0) - - a = 0 - with relative_timer() as time_bench: - for i in _range(total): - a += i - our_file.write(("%i" % a) * 40) - - # Compute relative overhead of tqdm against native range() - try: - assert (time_tqdm() < 60 * time_bench()) - except AssertionError: - raise AssertionError('trange(%g): %f, range(%g): %f' % - (total, time_tqdm(), total, time_bench())) - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_manual_overhead_hard(): - """Test overhead of manual tqdm (hard)""" - - total = int(1e5) - - with closing(MockIO()) as our_file: - t = tqdm(total=total * 10, file=our_file, leave=True, miniters=1, - mininterval=0, maxinterval=0) - a = 0 - with relative_timer() as time_tqdm: - for i in _range(total): - a += i - t.update(10) - - a = 0 - with relative_timer() as time_bench: - for i in _range(total): - a += i - our_file.write(("%i" % a) * 40) - - # Compute relative overhead of tqdm against native range() - try: - assert (time_tqdm() < 100 * time_bench()) - except AssertionError: - raise AssertionError('tqdm(%g): %f, range(%g): %f' % - (total, time_tqdm(), total, time_bench())) - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_iter_overhead_simplebar_hard(): - """Test overhead of iteration based tqdm vs simple progress bar (hard)""" - - total = int(1e4) - - with closing(MockIO()) as our_file: - a = 0 - with trange(total, file=our_file, leave=True, miniters=1, - mininterval=0, maxinterval=0) as t: - with relative_timer() as time_tqdm: - for i in t: - a += i - assert (a == (total * total - total) / 2.0) - - a = 0 - s = simple_progress(_range(total), file=our_file, leave=True, - miniters=1, mininterval=0) - with relative_timer() as time_bench: - for i in s: - a += i - - # Compute relative overhead of tqdm against native range() - try: - assert (time_tqdm() < 2.5 * time_bench()) - except AssertionError: - raise AssertionError('trange(%g): %f, simple_progress(%g): %f' % - (total, time_tqdm(), total, time_bench())) - - -@with_setup(pretest, posttest) -@retry_on_except() -def test_manual_overhead_simplebar_hard(): - """Test overhead of manual tqdm vs simple progress bar (hard)""" - - total = int(1e4) - - with closing(MockIO()) as our_file: - t = tqdm(total=total * 10, file=our_file, leave=True, miniters=1, - mininterval=0, maxinterval=0) - a = 0 - with relative_timer() as time_tqdm: - for i in _range(total): - a += i - t.update(10) - - simplebar_update = simple_progress( - total=total, file=our_file, leave=True, miniters=1, mininterval=0) - a = 0 - with relative_timer() as time_bench: - for i in _range(total): - a += i - simplebar_update(10) - - # Compute relative overhead of tqdm against native range() - try: - assert (time_tqdm() < 2.5 * time_bench()) - except AssertionError: - raise AssertionError('tqdm(%g): %f, simple_progress(%g): %f' % - (total, time_tqdm(), total, time_bench())) diff --git a/lib/tqdm/tests/tests_synchronisation.py b/lib/tqdm/tests/tests_synchronisation.py deleted file mode 100644 index 09da4ee5..00000000 --- a/lib/tqdm/tests/tests_synchronisation.py +++ /dev/null @@ -1,164 +0,0 @@ -from __future__ import division -from tqdm import tqdm -from tests_tqdm import with_setup, pretest, posttest, StringIO, closing -from tests_tqdm import DiscreteTimer, cpu_timify - -from time import sleep -from threading import Event -from tqdm import TMonitor - - -class FakeSleep(object): - """Wait until the discrete timer reached the required time""" - def __init__(self, dtimer): - self.dtimer = dtimer - - def sleep(self, t): - end = t + self.dtimer.t - while self.dtimer.t < end: - sleep(0.0000001) # sleep a bit to interrupt (instead of pass) - - -class FakeTqdm(object): - _instances = [] - - -def make_create_fake_sleep_event(sleep): - def wait(self, timeout=None): - if timeout is not None: - sleep(timeout) - return self.is_set() - - def create_fake_sleep_event(): - event = Event() - event.wait = wait - return event - - return create_fake_sleep_event - - -@with_setup(pretest, posttest) -def test_monitor_thread(): - """Test dummy monitoring thread""" - maxinterval = 10 - - # Setup a discrete timer - timer = DiscreteTimer() - TMonitor._time = timer.time - # And a fake sleeper - sleeper = FakeSleep(timer) - TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) - - # Instanciate the monitor - monitor = TMonitor(FakeTqdm, maxinterval) - # Test if alive, then killed - assert monitor.report() - monitor.exit() - timer.sleep(maxinterval * 2) # need to go out of the sleep to die - assert not monitor.report() - # assert not monitor.is_alive() # not working dunno why, thread not killed - del monitor - - -@with_setup(pretest, posttest) -def test_monitoring_and_cleanup(): - """Test for stalled tqdm instance and monitor deletion""" - # Note: should fix miniters for these tests, else with dynamic_miniters - # it's too complicated to handle with monitoring update and maxinterval... - maxinterval = 2 - - total = 1000 - # Setup a discrete timer - timer = DiscreteTimer() - # And a fake sleeper - sleeper = FakeSleep(timer) - # Setup TMonitor to use the timer - TMonitor._time = timer.time - TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) - # Set monitor interval - tqdm.monitor_interval = maxinterval - with closing(StringIO()) as our_file: - with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, - maxinterval=maxinterval) as t: - cpu_timify(t, timer) - # Do a lot of iterations in a small timeframe - # (smaller than monitor interval) - timer.sleep(maxinterval / 2) # monitor won't wake up - t.update(500) - # check that our fixed miniters is still there - assert t.miniters == 500 - # Then do 1 it after monitor interval, so that monitor kicks in - timer.sleep(maxinterval * 2) - t.update(1) - # Wait for the monitor to get out of sleep's loop and update tqdm.. - timeend = timer.time() - while not (t.monitor.woken >= timeend and t.miniters == 1): - timer.sleep(1) # Force monitor to wake up if it woken too soon - sleep(0.000001) # sleep to allow interrupt (instead of pass) - assert t.miniters == 1 # check that monitor corrected miniters - # Note: at this point, there may be a race condition: monitor saved - # current woken time but timer.sleep() happen just before monitor - # sleep. To fix that, either sleep here or increase time in a loop - # to ensure that monitor wakes up at some point. - - # Try again but already at miniters = 1 so nothing will be done - timer.sleep(maxinterval * 2) - t.update(2) - timeend = timer.time() - while not (t.monitor.woken >= timeend): - timer.sleep(1) # Force monitor to wake up if it woken too soon - sleep(0.000001) - # Wait for the monitor to get out of sleep's loop and update tqdm.. - assert t.miniters == 1 # check that monitor corrected miniters - - # Check that class var monitor is deleted if no instance left - tqdm.monitor_interval = 10 - assert tqdm.monitor is None - - -@with_setup(pretest, posttest) -def test_monitoring_multi(): - """Test on multiple bars, one not needing miniters adjustment""" - # Note: should fix miniters for these tests, else with dynamic_miniters - # it's too complicated to handle with monitoring update and maxinterval... - maxinterval = 2 - - total = 1000 - # Setup a discrete timer - timer = DiscreteTimer() - # And a fake sleeper - sleeper = FakeSleep(timer) - # Setup TMonitor to use the timer - TMonitor._time = timer.time - TMonitor._event = make_create_fake_sleep_event(sleeper.sleep) - # Set monitor interval - tqdm.monitor_interval = maxinterval - with closing(StringIO()) as our_file: - with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, - maxinterval=maxinterval) as t1: - # Set high maxinterval for t2 so monitor does not need to adjust it - with tqdm(total=total, file=our_file, miniters=500, mininterval=0.1, - maxinterval=1E5) as t2: - cpu_timify(t1, timer) - cpu_timify(t2, timer) - # Do a lot of iterations in a small timeframe - timer.sleep(maxinterval / 2) - t1.update(500) - t2.update(500) - assert t1.miniters == 500 - assert t2.miniters == 500 - # Then do 1 it after monitor interval, so that monitor kicks in - timer.sleep(maxinterval * 2) - t1.update(1) - t2.update(1) - # Wait for the monitor to get out of sleep and update tqdm - timeend = timer.time() - while not (t1.monitor.woken >= timeend and t1.miniters == 1): - timer.sleep(1) - sleep(0.000001) - assert t1.miniters == 1 # check that monitor corrected miniters - assert t2.miniters == 500 # check that t2 was not adjusted - - # Check that class var monitor is deleted if no instance left - tqdm.monitor_interval = 10 - assert tqdm.monitor is None diff --git a/lib/tqdm/tests/tests_tqdm.py b/lib/tqdm/tests/tests_tqdm.py deleted file mode 100644 index 89640d7f..00000000 --- a/lib/tqdm/tests/tests_tqdm.py +++ /dev/null @@ -1,1541 +0,0 @@ -# Advice: use repr(our_file.read()) to print the full output of tqdm -# (else '\r' will replace the previous lines and you'll see only the latest. - -from __future__ import unicode_literals - -import sys -import csv -import re -import os -from nose import with_setup -from nose.plugins.skip import SkipTest -from nose.tools import assert_raises -from contextlib import contextmanager - -from tqdm import tqdm -from tqdm import trange -from tqdm import TqdmDeprecationWarning - -try: - from StringIO import StringIO -except ImportError: - from io import StringIO - -from io import IOBase # to support unicode strings - - -class DeprecationError(Exception): - pass - - -# Ensure we can use `with closing(...) as ... :` syntax -if getattr(StringIO, '__exit__', False) and \ - getattr(StringIO, '__enter__', False): - def closing(arg): - return arg -else: - from contextlib import closing - -try: - _range = xrange -except NameError: - _range = range - -try: - _unicode = unicode -except NameError: - _unicode = str - -nt_and_no_colorama = False -if os.name == 'nt': - try: - import colorama # NOQA - except ImportError: - nt_and_no_colorama = True - -# Regex definitions -# List of control characters -CTRLCHR = [r'\r', r'\n', r'\x1b\[A'] # Need to escape [ for regex -# Regular expressions compilation -RE_rate = re.compile(r'(\d+\.\d+)it/s') -RE_ctrlchr = re.compile("(%s)" % '|'.join(CTRLCHR)) # Match control chars -RE_ctrlchr_excl = re.compile('|'.join(CTRLCHR)) # Match and exclude ctrl chars -RE_pos = re.compile( - r'((\x1b\[A|\r|\n)+((pos\d+) bar:\s+\d+%|\s{3,6})?)') # NOQA - - -class DiscreteTimer(object): - """Virtual discrete time manager, to precisely control time for tests""" - - def __init__(self): - self.t = 0.0 - - def sleep(self, t): - """Sleep = increment the time counter (almost no CPU used)""" - self.t += t - - def time(self): - """Get the current time""" - return self.t - - -def cpu_timify(t, timer=None): - """Force tqdm to use the specified timer instead of system-wide time()""" - if timer is None: - timer = DiscreteTimer() - t._time = timer.time - t._sleep = timer.sleep - t.start_t = t.last_print_t = t._time() - return timer - - -def pretest(): - # setcheckinterval is deprecated - getattr(sys, 'setswitchinterval', getattr(sys, 'setcheckinterval'))(100) - - if getattr(tqdm, "_instances", False): - n = len(tqdm._instances) - if n: - tqdm._instances.clear() - raise EnvironmentError( - "{0} `tqdm` instances still in existence PRE-test".format(n)) - - -def posttest(): - if getattr(tqdm, "_instances", False): - n = len(tqdm._instances) - if n: - tqdm._instances.clear() - raise EnvironmentError( - "{0} `tqdm` instances still in existence POST-test".format(n)) - - -class UnicodeIO(IOBase): - """Unicode version of StringIO""" - - def __init__(self, *args, **kwargs): - super(UnicodeIO, self).__init__(*args, **kwargs) - self.encoding = 'U8' # io.StringIO supports unicode, but no encoding - self.text = '' - self.cursor = 0 - - def __len__(self): - return len(self.text) - - def seek(self, offset): - self.cursor = offset - - def tell(self): - return self.cursor - - def write(self, s): - self.text = self.text[:self.cursor] + s + \ - self.text[self.cursor + len(s):] - self.cursor += len(s) - - def read(self, n=-1): - _cur = self.cursor - self.cursor = len(self) if n < 0 \ - else min(_cur + n, len(self)) - return self.text[_cur:self.cursor] - - def getvalue(self): - return self.text - - -def get_bar(all_bars, i): - """Get a specific update from a whole bar traceback""" - # Split according to any used control characters - bars_split = RE_ctrlchr_excl.split(all_bars) - bars_split = list(filter(None, bars_split)) # filter out empty splits - return bars_split[i] - - -def progressbar_rate(bar_str): - return float(RE_rate.search(bar_str).group(1)) - - -def squash_ctrlchars(s): - """Apply control characters in a string just like a terminal display""" - # List of supported control codes - ctrlcodes = [r'\r', r'\n', r'\x1b\[A'] - - # Init variables - curline = 0 # current line in our fake terminal - lines = [''] # state of our fake terminal - - # Split input string by control codes - RE_ctrl = re.compile("(%s)" % ("|".join(ctrlcodes)), flags=re.DOTALL) - s_split = RE_ctrl.split(s) - s_split = filter(None, s_split) # filter out empty splits - - # For each control character or message - for nextctrl in s_split: - # If it's a control character, apply it - if nextctrl == '\r': - # Carriage return - # Go to the beginning of the line - # simplified here: we just empty the string - lines[curline] = '' - elif nextctrl == '\n': - # Newline - # Go to the next line - if curline < (len(lines) - 1): - # If already exists, just move cursor - curline += 1 - else: - # Else the new line is created - lines.append('') - curline += 1 - elif nextctrl == '\x1b[A': - # Move cursor up - if curline > 0: - curline -= 1 - else: - raise ValueError("Cannot go up, anymore!") - # Else, it is a message, we print it on current line - else: - lines[curline] += nextctrl - - return lines - - -def test_format_interval(): - """Test time interval format""" - format_interval = tqdm.format_interval - - assert format_interval(60) == '01:00' - assert format_interval(6160) == '1:42:40' - assert format_interval(238113) == '66:08:33' - - -def test_format_meter(): - """Test statistics and progress bar formatting""" - try: - unich = unichr - except NameError: - unich = chr - - format_meter = tqdm.format_meter - - assert format_meter(0, 1000, 13) == \ - " 0%| | 0/1000 [00:13= (bigstep - 1) and \ - ((i - (bigstep - 1)) % smallstep) == 0: - timer.sleep(1e-2) - if i >= 3 * bigstep: - break - - our_file.seek(0) - assert "15%" in our_file.read() - - # Test different behavior with and without mininterval - timer = DiscreteTimer() - total = 1000 - mininterval = 0.1 - maxinterval = 10 - with closing(StringIO()) as our_file: - with tqdm(total=total, file=our_file, miniters=None, smoothing=1, - mininterval=mininterval, maxinterval=maxinterval) as tm1: - with tqdm(total=total, file=our_file, miniters=None, smoothing=1, - mininterval=0, maxinterval=maxinterval) as tm2: - - cpu_timify(tm1, timer) - cpu_timify(tm2, timer) - - # Fast iterations, check if dynamic_miniters triggers - timer.sleep(mininterval) # to force update for t1 - tm1.update(total / 2) - tm2.update(total / 2) - assert int(tm1.miniters) == tm2.miniters == total / 2 - - # Slow iterations, check different miniters if mininterval - timer.sleep(maxinterval * 2) - tm1.update(total / 2) - tm2.update(total / 2) - res = [tm1.miniters, tm2.miniters] - assert res == [(total / 2) * mininterval / (maxinterval * 2), - (total / 2) * maxinterval / (maxinterval * 2)] - - # Same with iterable based tqdm - timer1 = DiscreteTimer() # need 2 timers for each bar because zip not work - timer2 = DiscreteTimer() - total = 100 - mininterval = 0.1 - maxinterval = 10 - with closing(StringIO()) as our_file: - t1 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, - mininterval=mininterval, maxinterval=maxinterval) - t2 = tqdm(_range(total), file=our_file, miniters=None, smoothing=1, - mininterval=0, maxinterval=maxinterval) - - cpu_timify(t1, timer1) - cpu_timify(t2, timer2) - - for i in t1: - if i == ((total / 2) - 2): - timer1.sleep(mininterval) - if i == (total - 1): - timer1.sleep(maxinterval * 2) - - for i in t2: - if i == ((total / 2) - 2): - timer2.sleep(mininterval) - if i == (total - 1): - timer2.sleep(maxinterval * 2) - - assert t1.miniters == 0.255 - assert t2.miniters == 0.5 - - t1.close() - t2.close() - - -@with_setup(pretest, posttest) -def test_min_iters(): - """Test miniters""" - with closing(StringIO()) as our_file: - for _ in tqdm(_range(3), file=our_file, leave=True, miniters=4): - our_file.write('blank\n') - our_file.seek(0) - assert '\nblank\nblank\n' in our_file.read() - - with closing(StringIO()) as our_file: - for _ in tqdm(_range(3), file=our_file, leave=True, miniters=1): - our_file.write('blank\n') - our_file.seek(0) - # assume automatic mininterval = 0 means intermediate output - assert '| 3/3 ' in our_file.read() - - -@with_setup(pretest, posttest) -def test_dynamic_min_iters(): - """Test purely dynamic miniters (and manual updates and __del__)""" - with closing(StringIO()) as our_file: - total = 10 - t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, - smoothing=1) - - t.update() - # Increase 3 iterations - t.update(3) - # The next two iterations should be skipped because of dynamic_miniters - t.update() - t.update() - # The third iteration should be displayed - t.update() - - our_file.seek(0) - out = our_file.read() - assert t.dynamic_miniters - t.__del__() # simulate immediate del gc - - assert ' 0%| | 0/10 [00:00<' in out - assert '40%' in out - assert '50%' not in out - assert '60%' not in out - assert '70%' in out - - # Check with smoothing=0, miniters should be set to max update seen so far - with closing(StringIO()) as our_file: - total = 10 - t = tqdm(total=total, file=our_file, miniters=None, mininterval=0, - smoothing=0) - - t.update() - t.update(2) - t.update(5) # this should be stored as miniters - t.update(1) - - our_file.seek(0) - out = our_file.read() - assert all(i in out for i in ("0/10", "1/10", "3/10")) - assert "2/10" not in out - assert t.dynamic_miniters and not t.smoothing - assert t.miniters == 5 - t.close() - - # Check iterable based tqdm - with closing(StringIO()) as our_file: - t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, - smoothing=0.5) - for _ in t: - pass - assert t.dynamic_miniters - - # No smoothing - with closing(StringIO()) as our_file: - t = tqdm(_range(10), file=our_file, miniters=None, mininterval=None, - smoothing=0) - for _ in t: - pass - assert t.dynamic_miniters - - # No dynamic_miniters (miniters is fixed manually) - with closing(StringIO()) as our_file: - t = tqdm(_range(10), file=our_file, miniters=1, mininterval=None) - for _ in t: - pass - assert not t.dynamic_miniters - - -@with_setup(pretest, posttest) -def test_big_min_interval(): - """Test large mininterval""" - with closing(StringIO()) as our_file: - for _ in tqdm(_range(2), file=our_file, mininterval=1E10): - pass - our_file.seek(0) - assert '50%' not in our_file.read() - - with closing(StringIO()) as our_file: - with tqdm(_range(2), file=our_file, mininterval=1E10) as t: - t.update() - t.update() - our_file.seek(0) - assert '50%' not in our_file.read() - - -@with_setup(pretest, posttest) -def test_smoothed_dynamic_min_iters(): - """Test smoothed dynamic miniters""" - timer = DiscreteTimer() - - with closing(StringIO()) as our_file: - with tqdm(total=100, file=our_file, miniters=None, mininterval=0, - smoothing=0.5, maxinterval=0) as t: - cpu_timify(t, timer) - - # Increase 10 iterations at once - t.update(10) - # The next iterations should be partially skipped - for _ in _range(2): - t.update(4) - for _ in _range(20): - t.update() - - our_file.seek(0) - out = our_file.read() - assert t.dynamic_miniters - assert ' 0%| | 0/100 [00:00<' in out - assert '10%' in out - assert '14%' not in out - assert '18%' in out - assert '20%' not in out - assert '25%' in out - assert '30%' not in out - assert '32%' in out - - -@with_setup(pretest, posttest) -def test_smoothed_dynamic_min_iters_with_min_interval(): - """Test smoothed dynamic miniters with mininterval""" - timer = DiscreteTimer() - - # In this test, `miniters` should gradually decline - total = 100 - - with closing(StringIO()) as our_file: - # Test manual updating tqdm - with tqdm(total=total, file=our_file, miniters=None, mininterval=1e-3, - smoothing=1, maxinterval=0) as t: - cpu_timify(t, timer) - - t.update(10) - timer.sleep(1e-2) - for _ in _range(4): - t.update() - timer.sleep(1e-2) - our_file.seek(0) - out = our_file.read() - assert t.dynamic_miniters - - with closing(StringIO()) as our_file: - # Test iteration-based tqdm - with tqdm(_range(total), file=our_file, miniters=None, - mininterval=0.01, smoothing=1, maxinterval=0) as t2: - cpu_timify(t2, timer) - - for i in t2: - if i >= 10: - timer.sleep(0.1) - if i >= 14: - break - our_file.seek(0) - out2 = our_file.read() - - assert t.dynamic_miniters - assert ' 0%| | 0/100 [00:00<' in out - assert '11%' in out and '11%' in out2 - # assert '12%' not in out and '12%' in out2 - assert '13%' in out and '13%' in out2 - assert '14%' in out and '14%' in out2 - - -@with_setup(pretest, posttest) -def test_disable(): - """Test disable""" - with closing(StringIO()) as our_file: - for _ in tqdm(_range(3), file=our_file, disable=True): - pass - our_file.seek(0) - assert our_file.read() == '' - - with closing(StringIO()) as our_file: - progressbar = tqdm(total=3, file=our_file, miniters=1, disable=True) - progressbar.update(3) - progressbar.close() - our_file.seek(0) - assert our_file.read() == '' - - -@with_setup(pretest, posttest) -def test_unit(): - """Test SI unit prefix""" - with closing(StringIO()) as our_file: - for _ in tqdm(_range(3), file=our_file, miniters=1, unit="bytes"): - pass - our_file.seek(0) - assert 'bytes/s' in our_file.read() - - -@with_setup(pretest, posttest) -def test_ascii(): - """Test ascii/unicode bar""" - # Test ascii autodetection - with closing(StringIO()) as our_file: - with tqdm(total=10, file=our_file, ascii=None) as t: - assert t.ascii # TODO: this may fail in the future - - # Test ascii bar - with closing(StringIO()) as our_file: - for _ in tqdm(_range(3), total=15, file=our_file, miniters=1, - mininterval=0, ascii=True): - pass - our_file.seek(0) - res = our_file.read().strip("\r").split("\r") - assert '7%|6' in res[1] - assert '13%|#3' in res[2] - assert '20%|##' in res[3] - - # Test unicode bar - with closing(UnicodeIO()) as our_file: - with tqdm(total=15, file=our_file, ascii=False, mininterval=0) as t: - for _ in _range(3): - t.update() - our_file.seek(0) - res = our_file.read().strip("\r").split("\r") - assert "7%|\u258b" in res[1] - assert "13%|\u2588\u258e" in res[2] - assert "20%|\u2588\u2588" in res[3] - - -@with_setup(pretest, posttest) -def test_update(): - """Test manual creation and updates""" - res = None - with closing(StringIO()) as our_file: - with tqdm(total=2, file=our_file, miniters=1, mininterval=0) \ - as progressbar: - assert len(progressbar) == 2 - progressbar.update(2) - our_file.seek(0) - assert '| 2/2' in our_file.read() - progressbar.desc = 'dynamically notify of 4 increments in total' - progressbar.total = 4 - try: - progressbar.update(-10) - except ValueError as e: - if str(e) != "n (-10) cannot be negative": - raise - progressbar.update() # should default to +1 - else: - raise ValueError("Should not support negative updates") - our_file.seek(0) - res = our_file.read() - assert '| 3/4 ' in res - assert 'dynamically notify of 4 increments in total' in res - - -@with_setup(pretest, posttest) -def test_close(): - """Test manual creation and closure and n_instances""" - - # With `leave` option - with closing(StringIO()) as our_file: - progressbar = tqdm(total=3, file=our_file, miniters=10) - progressbar.update(3) - assert '| 3/3 ' not in our_file.getvalue() # Should be blank - assert len(tqdm._instances) == 1 - progressbar.close() - assert len(tqdm._instances) == 0 - assert '| 3/3 ' in our_file.getvalue() - - # Without `leave` option - with closing(StringIO()) as our_file: - progressbar = tqdm(total=3, file=our_file, miniters=10, leave=False) - progressbar.update(3) - progressbar.close() - assert '| 3/3 ' not in our_file.getvalue() # Should be blank - - # With all updates - with closing(StringIO()) as our_file: - assert len(tqdm._instances) == 0 - with tqdm(total=3, file=our_file, miniters=0, mininterval=0, - leave=True) as progressbar: - assert len(tqdm._instances) == 1 - progressbar.update(3) - res = our_file.getvalue() - assert '| 3/3 ' in res # Should be blank - # close() called - assert len(tqdm._instances) == 0 - our_file.seek(0) - - exres = res + '\n' - if exres != our_file.read(): - our_file.seek(0) - raise AssertionError( - "\nExpected:\n{0}\nGot:{1}\n".format(exres, our_file.read())) - - # Closing after the output stream has closed - with closing(StringIO()) as our_file: - t = tqdm(total=2, file=our_file) - t.update() - t.update() - t.close() - - -@with_setup(pretest, posttest) -def test_smoothing(): - """Test exponential weighted average smoothing""" - timer = DiscreteTimer() - - # -- Test disabling smoothing - with closing(StringIO()) as our_file: - with tqdm(_range(3), file=our_file, smoothing=None, leave=True) as t: - cpu_timify(t, timer) - - for _ in t: - pass - our_file.seek(0) - assert '| 3/3 ' in our_file.read() - - # -- Test smoothing - # Compile the regex to find the rate - # 1st case: no smoothing (only use average) - with closing(StringIO()) as our_file2: - with closing(StringIO()) as our_file: - t = tqdm(_range(3), file=our_file2, smoothing=None, leave=True, - miniters=1, mininterval=0) - cpu_timify(t, timer) - - with tqdm(_range(3), file=our_file, smoothing=None, leave=True, - miniters=1, mininterval=0) as t2: - cpu_timify(t2, timer) - - for i in t2: - # Sleep more for first iteration and - # see how quickly rate is updated - if i == 0: - timer.sleep(0.01) - else: - # Need to sleep in all iterations - # to calculate smoothed rate - # (else delta_t is 0!) - timer.sleep(0.001) - t.update() - n_old = len(tqdm._instances) - t.close() - assert len(tqdm._instances) == n_old - 1 - # Get result for iter-based bar - a = progressbar_rate(get_bar(our_file.getvalue(), 3)) - # Get result for manually updated bar - a2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) - - # 2nd case: use max smoothing (= instant rate) - with closing(StringIO()) as our_file2: - with closing(StringIO()) as our_file: - t = tqdm(_range(3), file=our_file2, smoothing=1, leave=True, - miniters=1, mininterval=0) - cpu_timify(t, timer) - - with tqdm(_range(3), file=our_file, smoothing=1, leave=True, - miniters=1, mininterval=0) as t2: - cpu_timify(t2, timer) - - for i in t2: - if i == 0: - timer.sleep(0.01) - else: - timer.sleep(0.001) - t.update() - t.close() - # Get result for iter-based bar - b = progressbar_rate(get_bar(our_file.getvalue(), 3)) - # Get result for manually updated bar - b2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) - - # 3rd case: use medium smoothing - with closing(StringIO()) as our_file2: - with closing(StringIO()) as our_file: - t = tqdm(_range(3), file=our_file2, smoothing=0.5, leave=True, - miniters=1, mininterval=0) - cpu_timify(t, timer) - - t2 = tqdm(_range(3), file=our_file, smoothing=0.5, leave=True, - miniters=1, mininterval=0) - cpu_timify(t2, timer) - - for i in t2: - if i == 0: - timer.sleep(0.01) - else: - timer.sleep(0.001) - t.update() - t2.close() - t.close() - # Get result for iter-based bar - c = progressbar_rate(get_bar(our_file.getvalue(), 3)) - # Get result for manually updated bar - c2 = progressbar_rate(get_bar(our_file2.getvalue(), 3)) - - # Check that medium smoothing's rate is between no and max smoothing rates - assert a <= c <= b - assert a2 <= c2 <= b2 - - -@with_setup(pretest, posttest) -def test_deprecated_nested(): - """Test nested progress bars""" - if nt_and_no_colorama: - raise SkipTest - # TODO: test degradation on windows without colorama? - - # Artificially test nested loop printing - # Without leave - our_file = StringIO() - try: - tqdm(total=2, file=our_file, nested=True) - except TqdmDeprecationWarning: - if """`nested` is deprecated and automated.\ - Use position instead for manual control.""" not in our_file.getvalue(): - raise - else: - raise DeprecationError("Should not allow nested kwarg") - - -@with_setup(pretest, posttest) -def test_bar_format(): - """Test custom bar formatting""" - with closing(StringIO()) as our_file: - bar_format = r'{l_bar}{bar}|{n_fmt}/{total_fmt}-{n}/{total}{percentage}{rate}{rate_fmt}{elapsed}{remaining}' # NOQA - for _ in trange(2, file=our_file, leave=True, bar_format=bar_format): - pass - out = our_file.getvalue() - assert "\r 0%| |0/2-0/20.0None?it/s00:00?\r" in out - - # Test unicode string auto conversion - with closing(StringIO()) as our_file: - bar_format = r'hello world' - with tqdm(ascii=False, bar_format=bar_format, file=our_file) as t: - assert isinstance(t.bar_format, _unicode) - - -@with_setup(pretest, posttest) -def test_unpause(): - """Test unpause""" - timer = DiscreteTimer() - with closing(StringIO()) as our_file: - t = trange(10, file=our_file, leave=True, mininterval=0) - cpu_timify(t, timer) - timer.sleep(0.01) - t.update() - timer.sleep(0.01) - t.update() - timer.sleep(0.1) # longer wait time - t.unpause() - timer.sleep(0.01) - t.update() - timer.sleep(0.01) - t.update() - t.close() - r_before = progressbar_rate(get_bar(our_file.getvalue(), 2)) - r_after = progressbar_rate(get_bar(our_file.getvalue(), 3)) - assert r_before == r_after - - -@with_setup(pretest, posttest) -def test_position(): - """Test positioned progress bars""" - if nt_and_no_colorama: - raise SkipTest - - # Artificially test nested loop printing - # Without leave - our_file = StringIO() - kwargs = dict(file=our_file, miniters=1, mininterval=0, maxinterval=0) - t = tqdm(total=2, desc='pos2 bar', leave=False, position=2, **kwargs) - t.update() - t.close() - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_pos.findall(out)] - exres = ['\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\n\r ', - '\x1b[A\x1b[A'] - if res != exres: - raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( - str(exres), str(res), str([out]))) - - # Test iteration-based tqdm positioning - our_file = StringIO() - kwargs["file"] = our_file - for _ in trange(2, desc='pos0 bar', position=0, **kwargs): - for _ in trange(2, desc='pos1 bar', position=1, **kwargs): - for _ in trange(2, desc='pos2 bar', position=2, **kwargs): - pass - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_pos.findall(out)] - exres = ['\rpos0 bar: 0%', - '\n\rpos1 bar: 0%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', - '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 50%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', - '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 100%', - '\x1b[A\n\x1b[A\rpos0 bar: 50%', - '\n\rpos1 bar: 0%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', - '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 50%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\n\rpos2 bar: 100%', - '\x1b[A\x1b[A\n\n\x1b[A\x1b[A\n\rpos1 bar: 100%', - '\x1b[A\n\x1b[A\rpos0 bar: 100%', - '\n'] - if res != exres: - raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( - str(exres), str(res), str([out]))) - - # Test manual tqdm positioning - our_file = StringIO() - kwargs["file"] = our_file - kwargs["total"] = 2 - t1 = tqdm(desc='pos0 bar', position=0, **kwargs) - t2 = tqdm(desc='pos1 bar', position=1, **kwargs) - t3 = tqdm(desc='pos2 bar', position=2, **kwargs) - for _ in _range(2): - t1.update() - t3.update() - t2.update() - our_file.seek(0) - out = our_file.read() - res = [m[0] for m in RE_pos.findall(out)] - exres = ['\rpos0 bar: 0%', - '\n\rpos1 bar: 0%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\rpos0 bar: 50%', - '\n\n\rpos2 bar: 50%', - '\x1b[A\x1b[A\n\rpos1 bar: 50%', - '\x1b[A\rpos0 bar: 100%', - '\n\n\rpos2 bar: 100%', - '\x1b[A\x1b[A\n\rpos1 bar: 100%', - '\x1b[A'] - if res != exres: - raise AssertionError("\nExpected:\n{0}\nGot:\n{1}\nRaw:\n{2}\n".format( - str(exres), str(res), str([out]))) - t1.close() - t2.close() - t3.close() - - # Test auto repositionning of bars when a bar is prematurely closed - # tqdm._instances.clear() # reset number of instances - with closing(StringIO()) as our_file: - t1 = tqdm(total=10, file=our_file, desc='pos0 bar', mininterval=0) - t2 = tqdm(total=10, file=our_file, desc='pos1 bar', mininterval=0) - t3 = tqdm(total=10, file=our_file, desc='pos2 bar', mininterval=0) - res = [m[0] for m in RE_pos.findall(our_file.getvalue())] - exres = ['\rpos0 bar: 0%', - '\n\rpos1 bar: 0%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A'] - if res != exres: - raise AssertionError( - "\nExpected:\n{0}\nGot:\n{1}\n".format(str(exres), str(res))) - - t2.close() - t4 = tqdm(total=10, file=our_file, desc='pos3 bar', mininterval=0) - t1.update(1) - t3.update(1) - t4.update(1) - res = [m[0] for m in RE_pos.findall(our_file.getvalue())] - exres = ['\rpos0 bar: 0%', - '\n\rpos1 bar: 0%', - '\x1b[A\n\n\rpos2 bar: 0%', - '\x1b[A\x1b[A\n\x1b[A\n\n\rpos3 bar: 0%', - '\x1b[A\x1b[A\rpos0 bar: 10%', - '\n\rpos2 bar: 10%', - '\x1b[A\n\n\rpos3 bar: 10%', - '\x1b[A\x1b[A'] - if res != exres: - raise AssertionError( - "\nExpected:\n{0}\nGot:\n{1}\n".format(str(exres), str(res))) - t4.close() - t3.close() - t1.close() - - -@with_setup(pretest, posttest) -def test_set_description(): - """Test set description""" - with closing(StringIO()) as our_file: - with tqdm(desc='Hello', file=our_file) as t: - assert t.desc == 'Hello' - t.set_description_str('World') - assert t.desc == 'World' - t.set_description() - assert t.desc == '' - t.set_description('Bye') - assert t.desc == 'Bye: ' - assert "World" in our_file.getvalue() - - # without refresh - with closing(StringIO()) as our_file: - with tqdm(desc='Hello', file=our_file) as t: - assert t.desc == 'Hello' - t.set_description_str('World', False) - assert t.desc == 'World' - t.set_description(None, False) - assert t.desc == '' - assert "World" not in our_file.getvalue() - - -@with_setup(pretest, posttest) -def test_deprecated_gui(): - """Test internal GUI properties""" - # Check: StatusPrinter iff gui is disabled - with closing(StringIO()) as our_file: - t = tqdm(total=2, gui=True, file=our_file, miniters=1, mininterval=0) - assert not hasattr(t, "sp") - try: - t.update(1) - except TqdmDeprecationWarning as e: - if 'Please use `tqdm_gui(...)` instead of `tqdm(..., gui=True)`' \ - not in our_file.getvalue(): - raise e - else: - raise DeprecationError('Should not allow manual gui=True without' - ' overriding __iter__() and update()') - finally: - t._instances.clear() - # t.close() - # len(tqdm._instances) += 1 # undo the close() decrement - - t = tqdm(_range(3), gui=True, file=our_file, miniters=1, mininterval=0) - try: - for _ in t: - pass - except TqdmDeprecationWarning as e: - if 'Please use `tqdm_gui(...)` instead of `tqdm(..., gui=True)`' \ - not in our_file.getvalue(): - raise e - else: - raise DeprecationError('Should not allow manual gui=True without' - ' overriding __iter__() and update()') - finally: - t._instances.clear() - # t.close() - # len(tqdm._instances) += 1 # undo the close() decrement - - with tqdm(total=1, gui=False, file=our_file) as t: - assert hasattr(t, "sp") - - -@with_setup(pretest, posttest) -def test_cmp(): - """Test comparison functions""" - with closing(StringIO()) as our_file: - t0 = tqdm(total=10, file=our_file) - t1 = tqdm(total=10, file=our_file) - t2 = tqdm(total=10, file=our_file) - - assert t0 < t1 - assert t2 >= t0 - assert t0 <= t2 - - t3 = tqdm(total=10, file=our_file) - t4 = tqdm(total=10, file=our_file) - t5 = tqdm(total=10, file=our_file) - t5.close() - t6 = tqdm(total=10, file=our_file) - - assert t3 != t4 - assert t3 > t2 - assert t5 == t6 - t6.close() - t4.close() - t3.close() - t2.close() - t1.close() - t0.close() - - -@with_setup(pretest, posttest) -def test_repr(): - """Test representation""" - with closing(StringIO()) as our_file: - with tqdm(total=10, ascii=True, file=our_file) as t: - assert str(t) == ' 0%| | 0/10 [00:00 out3.count('\r') - assert out4.count(", ".join(expected_order)) == 2 - - # Test setting postfix string directly - with closing(StringIO()) as our_file: - with trange(10, file=our_file, desc='pos2 bar', bar_format='{r_bar}', - postfix=None) as t5: - t5.set_postfix_str("Hello", False) - t5.set_postfix_str("World") - out5 = our_file.getvalue() - - assert "Hello" not in out5 - out5 = out5[1:-1].split(', ')[3:] - assert out5 == ["World"] - - -class DummyTqdmFile(object): - """Dummy file-like that will write to tqdm""" - file = None - - def __init__(self, file): - self.file = file - - def write(self, x): - # Avoid print() second call (useless \n) - if len(x.rstrip()) > 0: - tqdm.write(x, file=self.file, nolock=True) - - -@contextmanager -def std_out_err_redirect_tqdm(tqdm_file=sys.stderr): - orig_out_err = sys.stdout, sys.stderr - try: - sys.stdout = sys.stderr = DummyTqdmFile(tqdm_file) - yield orig_out_err[0] - # Relay exceptions - except Exception as exc: - raise exc - # Always restore sys.stdout/err if necessary - finally: - sys.stdout, sys.stderr = orig_out_err - - -@with_setup(pretest, posttest) -def test_file_redirection(): - """Test redirection of output""" - with closing(StringIO()) as our_file: - # Redirect stdout to tqdm.write() - with std_out_err_redirect_tqdm(tqdm_file=our_file): - for _ in trange(3): - print("Such fun") - res = our_file.getvalue() - assert res.count("Such fun\n") == 3 - assert "0/3" in res - assert "3/3" in res - - -@with_setup(pretest, posttest) -def test_external_write(): - """Test external write mode""" - with closing(StringIO()) as our_file: - # Redirect stdout to tqdm.write() - for _ in trange(3, file=our_file): - with tqdm.external_write_mode(file=our_file): - our_file.write("Such fun\n") - res = our_file.getvalue() - assert res.count("Such fun\n") == 3 - assert "0/3" in res - assert "3/3" in res - - -@with_setup(pretest, posttest) -def test_unit_scale(): - """Test numeric `unit_scale`""" - with closing(StringIO()) as our_file: - for _ in tqdm(_range(100), unit_scale=9, file=our_file): - pass - out = our_file.getvalue() - assert '900/900' in out - - -@with_setup(pretest, posttest) -def test_threading(): - """Test multiprocess/thread-realted features""" - from multiprocessing import RLock - try: - mp_lock = RLock() - except OSError: - pass - else: - tqdm.set_lock(mp_lock) - # TODO: test interleaved output #445 diff --git a/lib/tqdm/tests/tests_version.py b/lib/tqdm/tests/tests_version.py deleted file mode 100644 index 226b9980..00000000 --- a/lib/tqdm/tests/tests_version.py +++ /dev/null @@ -1,12 +0,0 @@ -import re - - -def test_version(): - """Test version string""" - from tqdm import __version__ - version_parts = re.split('[.-]', __version__) - assert 3 <= len(version_parts) # must have at least Major.minor.patch - try: - map(int, version_parts[:3]) - except ValueError: - raise TypeError('Version Major.minor.patch must be 3 integers')