Update tempora-4.1.2

This commit is contained in:
JonnyWong16 2021-10-14 21:13:30 -07:00
parent edd2f21ce1
commit a94edb4644
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
6 changed files with 982 additions and 695 deletions

View file

@ -1,22 +1,20 @@
# -*- coding: UTF-8 -*-
"Objects and routines pertaining to date and time (tempora)"
from __future__ import division, unicode_literals
import datetime
import time
import re
import numbers
import functools
import warnings
import contextlib
import six
__metaclass__ = type
from jaraco.functools import once
class Parser:
"""
*deprecated*
Datetime parser: parses a date-time string using multiple possible
formats.
@ -43,12 +41,18 @@ class Parser:
Traceback (most recent call last):
...
ValueError: More than one format string matched target 732.
>>> Parser(('%H',)).parse('22:21')
Traceback (most recent call last):
...
ValueError: No format strings matched the target 22:21.
"""
formats = ('%m/%d/%Y', '%m/%d/%y', '%Y-%m-%d', '%d-%b-%Y', '%d-%b-%y')
"some common default formats"
def __init__(self, formats=None):
warnings.warn("Use dateutil.parser", DeprecationWarning)
if formats:
self.formats = formats
@ -94,28 +98,98 @@ seconds_per_month = seconds_per_year / 12
hours_per_month = hours_per_day * days_per_year / 12
@once
def _needs_year_help():
"""
Some versions of Python render %Y with only three characters :(
https://bugs.python.org/issue39103
"""
return len(datetime.date(900, 1, 1).strftime('%Y')) != 4
def ensure_datetime(ob):
"""
Given a datetime or date or time object from the ``datetime``
module, always return a datetime using default values.
"""
if isinstance(ob, datetime.datetime):
return ob
date = time = ob
if isinstance(ob, datetime.date):
time = datetime.time()
if isinstance(ob, datetime.time):
date = datetime.date(1900, 1, 1)
return datetime.datetime.combine(date, time)
def strftime(fmt, t):
"""A class to replace the strftime in datetime package or time module.
Identical to strftime behavior in those modules except supports any
year.
Also supports datetime.datetime times.
Also supports milliseconds using %s
Also supports microseconds using %u"""
"""
Portable strftime.
In the stdlib, strftime has `known portability problems
<https://bugs.python.org/issue13305>`_. This function
aims to smooth over those issues and provide a
consistent experience across the major platforms.
>>> strftime('%Y', datetime.datetime(1890, 1, 1))
'1890'
>>> strftime('%Y', datetime.datetime(900, 1, 1))
'0900'
Supports time.struct_time, tuples, and datetime.datetime objects.
>>> strftime('%Y-%m-%d', (1976, 5, 7))
'1976-05-07'
Also supports date objects
>>> strftime('%Y', datetime.date(1976, 5, 7))
'1976'
Also supports milliseconds using %s.
>>> strftime('%s', datetime.time(microsecond=20000))
'020'
Also supports microseconds (3 digits) using %µ
>>> strftime('%µ', datetime.time(microsecond=123456))
'456'
Historically, %u was used for microseconds, but now
it honors the value rendered by stdlib.
>>> strftime('%u', datetime.date(1976, 5, 7))
'5'
Also supports microseconds (6 digits) using %f
>>> strftime('%f', datetime.time(microsecond=23456))
'023456'
Even supports time values on date objects (discouraged):
>>> strftime('%f', datetime.date(1976, 1, 1))
'000000'
>>> strftime('%µ', datetime.date(1976, 1, 1))
'000'
>>> strftime('%s', datetime.date(1976, 1, 1))
'000'
And vice-versa:
>>> strftime('%Y', datetime.time())
'1900'
"""
if isinstance(t, (time.struct_time, tuple)):
t = datetime.datetime(*t[:6])
assert isinstance(t, (datetime.datetime, datetime.time, datetime.date))
try:
year = t.year
if year < 1900:
t = t.replace(year=1900)
except AttributeError:
year = 1900
t = ensure_datetime(t)
subs = (
('%Y', '%04d' % year),
('%y', '%02d' % (year % 100)),
('%s', '%03d' % (t.microsecond // 1000)),
('%u', '%03d' % (t.microsecond % 1000))
('%µ', '%03d' % (t.microsecond % 1000)),
)
if _needs_year_help(): # pragma: nocover
subs += (('%Y', '%04d' % t.year),)
def doSub(s, sub):
return s.replace(*sub)
@ -127,95 +201,6 @@ def strftime(fmt, t):
return t.strftime(fmt)
def strptime(s, fmt, tzinfo=None):
"""
A function to replace strptime in the time module. Should behave
identically to the strptime function except it returns a datetime.datetime
object instead of a time.struct_time object.
Also takes an optional tzinfo parameter which is a time zone info object.
"""
res = time.strptime(s, fmt)
return datetime.datetime(tzinfo=tzinfo, *res[:6])
class DatetimeConstructor:
"""
>>> cd = DatetimeConstructor.construct_datetime
>>> cd(datetime.datetime(2011,1,1))
datetime.datetime(2011, 1, 1, 0, 0)
"""
@classmethod
def construct_datetime(cls, *args, **kwargs):
"""Construct a datetime.datetime from a number of different time
types found in python and pythonwin"""
if len(args) == 1:
arg = args[0]
method = cls.__get_dt_constructor(
type(arg).__module__,
type(arg).__name__,
)
result = method(arg)
try:
result = result.replace(tzinfo=kwargs.pop('tzinfo'))
except KeyError:
pass
if kwargs:
first_key = kwargs.keys()[0]
tmpl = (
"{first_key} is an invalid keyword "
"argument for this function."
)
raise TypeError(tmpl.format(**locals()))
else:
result = datetime.datetime(*args, **kwargs)
return result
@classmethod
def __get_dt_constructor(cls, moduleName, name):
try:
method_name = '__dt_from_{moduleName}_{name}__'.format(**locals())
return getattr(cls, method_name)
except AttributeError:
tmpl = (
"No way to construct datetime.datetime from "
"{moduleName}.{name}"
)
raise TypeError(tmpl.format(**locals()))
@staticmethod
def __dt_from_datetime_datetime__(source):
dtattrs = (
'year', 'month', 'day', 'hour', 'minute', 'second',
'microsecond', 'tzinfo',
)
attrs = map(lambda a: getattr(source, a), dtattrs)
return datetime.datetime(*attrs)
@staticmethod
def __dt_from___builtin___time__(pyt):
"Construct a datetime.datetime from a pythonwin time"
fmtString = '%Y-%m-%d %H:%M:%S'
result = strptime(pyt.Format(fmtString), fmtString)
# get milliseconds and microseconds. The only way to do this is
# to use the __float__ attribute of the time, which is in days.
microseconds_per_day = seconds_per_day * 1000000
microseconds = float(pyt) * microseconds_per_day
microsecond = int(microseconds % 1000000)
result = result.replace(microsecond=microsecond)
return result
@staticmethod
def __dt_from_timestamp__(timestamp):
return datetime.datetime.utcfromtimestamp(timestamp)
__dt_from___builtin___float__ = __dt_from_timestamp__
__dt_from___builtin___long__ = __dt_from_timestamp__
__dt_from___builtin___int__ = __dt_from_timestamp__
@staticmethod
def __dt_from_time_struct_time__(s):
return datetime.datetime(*s[:6])
def datetime_mod(dt, period, start=None):
"""
Find the time which is the specified date/time truncated to the time delta
@ -252,6 +237,7 @@ def datetime_mod(dt, period, start=None):
# point errors).
def get_time_delta_microseconds(td):
return (td.days * seconds_per_day + td.seconds) * 1000000 + td.microseconds
delta, period = map(get_time_delta_microseconds, (delta, period))
offset = datetime.timedelta(microseconds=delta % period)
# the result is the original specified time minus the offset
@ -282,6 +268,16 @@ def datetime_round(dt, period, start=None):
def get_nearest_year_for_day(day):
"""
Returns the nearest year to now inferred from a Julian date.
>>> freezer = getfixture('freezer')
>>> freezer.move_to('2019-05-20')
>>> get_nearest_year_for_day(20)
2019
>>> get_nearest_year_for_day(340)
2018
>>> freezer.move_to('2019-12-15')
>>> get_nearest_year_for_day(20)
2020
"""
now = time.gmtime()
result = now.tm_year
@ -322,7 +318,7 @@ def get_period_seconds(period):
...
ValueError: period not in (second, minute, hour, day, month, year)
"""
if isinstance(period, six.string_types):
if isinstance(period, str):
try:
name = 'seconds_per_' + period.lower()
result = globals()[name]
@ -363,7 +359,7 @@ def get_date_format_string(period):
"""
# handle the special case of 'month' which doesn't have
# a static interval in seconds
if isinstance(period, six.string_types) and period.lower() == 'month':
if isinstance(period, str) and period.lower() == 'month':
return '%Y-%m'
file_period_secs = get_period_seconds(period)
format_pieces = ('%Y', '-%m-%d', ' %H', '-%M', '-%S')
@ -391,24 +387,47 @@ def divide_timedelta_float(td, divisor):
>>> divide_timedelta_float(one_day, 2) == half_day
True
"""
# td is comprised of days, seconds, microseconds
dsm = [getattr(td, attr) for attr in ('days', 'seconds', 'microseconds')]
dsm = map(lambda elem: elem / divisor, dsm)
return datetime.timedelta(*dsm)
warnings.warn("Use native division", DeprecationWarning)
return td / divisor
def calculate_prorated_values():
"""
A utility function to prompt for a rate (a string in units per
unit time), and return that same rate for various time periods.
>>> monkeypatch = getfixture('monkeypatch')
>>> import builtins
>>> monkeypatch.setattr(builtins, 'input', lambda prompt: '3/hour')
>>> calculate_prorated_values()
per minute: 0.05
per hour: 3.0
per day: 72.0
per month: 2191.454166666667
per year: 26297.45
"""
rate = input("Enter the rate (3/hour, 50/month)> ")
for period, value in _prorated_values(rate):
print("per {period}: {value}".format(**locals()))
def _prorated_values(rate):
"""
Given a rate (a string in units per unit time), and return that same
rate for various time periods.
>>> for period, value in _prorated_values('20/hour'):
... print('{period}: {value:0.3f}'.format(**locals()))
minute: 0.333
hour: 20.000
day: 480.000
month: 14609.694
year: 175316.333
"""
rate = six.moves.input("Enter the rate (3/hour, 50/month)> ")
res = re.match(r'(?P<value>[\d.]+)/(?P<period>\w+)$', rate).groupdict()
value = float(res['value'])
value_per_second = value / get_period_seconds(res['period'])
for period in ('minute', 'hour', 'day', 'month', 'year'):
period_value = value_per_second * get_period_seconds(period)
print("per {period}: {period_value}".format(**locals()))
yield period, period_value
def parse_timedelta(str):
@ -441,27 +460,185 @@ def parse_timedelta(str):
>>> diff = later.replace(year=now.year) - now
>>> diff.seconds
20940
>>> parse_timedelta('14 seconds foo')
Traceback (most recent call last):
...
ValueError: Unexpected 'foo'
Supports abbreviations:
>>> parse_timedelta('1s')
datetime.timedelta(seconds=1)
>>> parse_timedelta('1sec')
datetime.timedelta(seconds=1)
>>> parse_timedelta('5min1sec')
datetime.timedelta(seconds=301)
>>> parse_timedelta('1 ms')
datetime.timedelta(microseconds=1000)
>>> parse_timedelta('1 µs')
datetime.timedelta(microseconds=1)
>>> parse_timedelta('1 us')
datetime.timedelta(microseconds=1)
And supports the common colon-separated duration:
>>> parse_timedelta('14:00:35.362')
datetime.timedelta(seconds=50435, microseconds=362000)
TODO: Should this be 14 hours or 14 minutes?
>>> parse_timedelta('14:00')
datetime.timedelta(seconds=50400)
>>> parse_timedelta('14:00 minutes')
Traceback (most recent call last):
...
ValueError: Cannot specify units with composite delta
Nanoseconds get rounded to the nearest microsecond:
>>> parse_timedelta('600 ns')
datetime.timedelta(microseconds=1)
>>> parse_timedelta('.002 µs, 499 ns')
datetime.timedelta(microseconds=1)
"""
deltas = (_parse_timedelta_part(part.strip()) for part in str.split(','))
return sum(deltas, datetime.timedelta())
return _parse_timedelta_nanos(str).resolve()
def _parse_timedelta_part(part):
match = re.match(r'(?P<value>[\d.]+) (?P<unit>\w+)', part)
if not match:
msg = "Unable to parse {part!r} as a time delta".format(**locals())
raise ValueError(msg)
unit = match.group('unit').lower()
def _parse_timedelta_nanos(str):
parts = re.finditer(r'(?P<value>[\d.:]+)\s?(?P<unit>[^\W\d_]+)?', str)
chk_parts = _check_unmatched(parts, str)
deltas = map(_parse_timedelta_part, chk_parts)
return sum(deltas, _Saved_NS())
def _check_unmatched(matches, text):
"""
Ensure no words appear in unmatched text.
"""
def check_unmatched(unmatched):
found = re.search(r'\w+', unmatched)
if found:
raise ValueError(f"Unexpected {found.group(0)!r}")
pos = 0
for match in matches:
check_unmatched(text[pos : match.start()])
yield match
pos = match.end()
check_unmatched(text[match.end() :])
_unit_lookup = {
'µs': 'microsecond',
'µsec': 'microsecond',
'us': 'microsecond',
'usec': 'microsecond',
'micros': 'microsecond',
'ms': 'millisecond',
'msec': 'millisecond',
'millis': 'millisecond',
's': 'second',
'sec': 'second',
'h': 'hour',
'hr': 'hour',
'm': 'minute',
'min': 'minute',
'w': 'week',
'wk': 'week',
'd': 'day',
'ns': 'nanosecond',
'nsec': 'nanosecond',
'nanos': 'nanosecond',
}
def _resolve_unit(raw_match):
if raw_match is None:
return 'second'
text = raw_match.lower()
return _unit_lookup.get(text, text)
def _parse_timedelta_composite(raw_value, unit):
if unit != 'seconds':
raise ValueError("Cannot specify units with composite delta")
values = raw_value.split(':')
units = 'hours', 'minutes', 'seconds'
composed = ' '.join(f'{value} {unit}' for value, unit in zip(values, units))
return _parse_timedelta_nanos(composed)
def _parse_timedelta_part(match):
unit = _resolve_unit(match.group('unit'))
if not unit.endswith('s'):
unit += 's'
value = float(match.group('value'))
raw_value = match.group('value')
if ':' in raw_value:
return _parse_timedelta_composite(raw_value, unit)
value = float(raw_value)
if unit == 'months':
unit = 'years'
value = value / 12
if unit == 'years':
unit = 'days'
value = value * days_per_year
return datetime.timedelta(**{unit: value})
return _Saved_NS.derive(unit, value)
class _Saved_NS:
"""
Bundle a timedelta with nanoseconds.
>>> _Saved_NS.derive('microseconds', .001)
_Saved_NS(td=datetime.timedelta(0), nanoseconds=1)
"""
td = datetime.timedelta()
nanoseconds = 0
multiplier = dict(
seconds=1000000000,
milliseconds=1000000,
microseconds=1000,
)
def __init__(self, **kwargs):
vars(self).update(kwargs)
@classmethod
def derive(cls, unit, value):
if unit == 'nanoseconds':
return _Saved_NS(nanoseconds=value)
res = _Saved_NS(td=datetime.timedelta(**{unit: value}))
with contextlib.suppress(KeyError):
res.nanoseconds = int(value * cls.multiplier[unit]) % 1000
return res
def __add__(self, other):
return _Saved_NS(
td=self.td + other.td, nanoseconds=self.nanoseconds + other.nanoseconds
)
def resolve(self):
"""
Resolve any nanoseconds into the microseconds field,
discarding any nanosecond resolution (but honoring partial
microseconds).
"""
addl_micros = round(self.nanoseconds / 1000)
return self.td + datetime.timedelta(microseconds=addl_micros)
def __repr__(self):
return f'_Saved_NS(td={self.td!r}, nanoseconds={self.nanoseconds!r})'
def divide_timedelta(td1, td2):
@ -473,12 +650,8 @@ def divide_timedelta(td1, td2):
>>> divide_timedelta(one_hour, one_day) == 1 / 24
True
"""
try:
warnings.warn("Use native division", DeprecationWarning)
return td1 / td2
except TypeError:
# Python 3.2 gets division
# http://bugs.python.org/issue2706
return td1.total_seconds() / td2.total_seconds()
def date_range(start=None, stop=None, step=None):
@ -496,6 +669,9 @@ def date_range(start=None, stop=None, step=None):
True
>>> datetime.datetime(2005,12,25) in my_range
False
>>> from_now = date_range(stop=datetime.datetime(2099, 12, 31))
>>> next(from_now)
datetime.datetime(...)
"""
if step is None:
step = datetime.timedelta(days=1)

View file

@ -1,10 +1,17 @@
# -*- coding: utf-8 -*-
"""
Classes for calling functions a schedule.
"""
Classes for calling functions a schedule. Has time zone support.
from __future__ import absolute_import
For example, to run a job at 08:00 every morning in 'Asia/Calcutta':
>>> job = lambda: print("time is now", datetime.datetime())
>>> time = datetime.time(8, tzinfo=pytz.timezone('Asia/Calcutta'))
>>> cmd = PeriodicCommandFixedDelay.daily_at(time, job)
>>> sched = InvokeScheduler()
>>> sched.add(cmd)
>>> while True: # doctest: +SKIP
... sched.run_pending()
... time.sleep(.1)
"""
import datetime
import numbers
@ -13,8 +20,6 @@ import bisect
import pytz
__metaclass__ = type
def now():
"""
@ -44,8 +49,13 @@ class DelayedCommand(datetime.datetime):
@classmethod
def from_datetime(cls, other):
return cls(
other.year, other.month, other.day, other.hour,
other.minute, other.second, other.microsecond,
other.year,
other.month,
other.day,
other.hour,
other.minute,
other.second,
other.microsecond,
other.tzinfo,
)
@ -91,6 +101,7 @@ class PeriodicCommand(DelayedCommand):
Like a delayed command, but expect this command to run every delay
seconds.
"""
def _next_time(self):
"""
Add delay to self, localized
@ -117,8 +128,7 @@ class PeriodicCommand(DelayedCommand):
def __setattr__(self, key, value):
if key == 'delay' and not value > datetime.timedelta():
raise ValueError(
"A PeriodicCommand must have a positive, "
"non-zero delay."
"A PeriodicCommand must have a positive, " "non-zero delay."
)
super(PeriodicCommand, self).__setattr__(key, value)
@ -132,6 +142,11 @@ class PeriodicCommandFixedDelay(PeriodicCommand):
@classmethod
def at_time(cls, at, delay, target):
"""
>>> cmd = PeriodicCommandFixedDelay.at_time(0, 30, None)
>>> cmd.delay.total_seconds()
30.0
"""
at = cls._from_timestamp(at)
cmd = cls.from_datetime(at)
if isinstance(delay, numbers.Number):
@ -144,11 +159,18 @@ class PeriodicCommandFixedDelay(PeriodicCommand):
def daily_at(cls, at, target):
"""
Schedule a command to run at a specific time each day.
>>> from tempora import utc
>>> noon = utc.time(12, 0)
>>> cmd = PeriodicCommandFixedDelay.daily_at(noon, None)
>>> cmd.delay.total_seconds()
86400.0
"""
daily = datetime.timedelta(days=1)
# convert when to the next datetime matching this time
when = datetime.datetime.combine(datetime.date.today(), at)
if when < now():
when -= daily
while when < now():
when += daily
return cls.at_time(cls._localize(when), daily, target)
@ -158,6 +180,7 @@ class Scheduler:
A rudimentary abstract scheduler accepting DelayedCommands
and dispatching them on schedule.
"""
def __init__(self):
self.queue = []
@ -186,6 +209,7 @@ class InvokeScheduler(Scheduler):
"""
Command targets are functions to be invoked on schedule.
"""
def run(self, command):
command.target()
@ -194,8 +218,9 @@ class CallbackScheduler(Scheduler):
"""
Command targets are passed to a dispatch callable on schedule.
"""
def __init__(self, dispatch):
super(CallbackScheduler, self).__init__()
super().__init__()
self.dispatch = dispatch
def run(self, command):

View file

@ -1,6 +1,7 @@
import time
import random
import datetime
from unittest import mock
import pytest
import pytz
@ -8,24 +9,8 @@ import freezegun
from tempora import schedule
__metaclass__ = type
@pytest.fixture
def naive_times(monkeypatch):
monkeypatch.setattr(
'irc.schedule.from_timestamp',
datetime.datetime.fromtimestamp)
monkeypatch.setattr('irc.schedule.now', datetime.datetime.now)
do_nothing = type(None)
try:
do_nothing()
except TypeError:
# Python 2 compat
def do_nothing():
return None
def test_delayed_command_order():
@ -33,10 +18,9 @@ def test_delayed_command_order():
delayed commands should be sorted by delay time
"""
delays = [random.randint(0, 99) for x in range(5)]
cmds = sorted([
schedule.DelayedCommand.after(delay, do_nothing)
for delay in delays
])
cmds = sorted(
[schedule.DelayedCommand.after(delay, do_nothing) for delay in delays]
)
assert [c.delay.seconds for c in cmds] == sorted(delays)
@ -53,9 +37,7 @@ def test_periodic_command_fixed_delay():
delay.
"""
fd = schedule.PeriodicCommandFixedDelay.at_time(
at=schedule.now(),
delay=datetime.timedelta(seconds=2),
target=lambda: None,
at=schedule.now(), delay=datetime.timedelta(seconds=2), target=lambda: None
)
assert fd.due() is True
assert fd.next().due() is False
@ -82,6 +64,16 @@ class TestCommands:
two_days_from_now = day_from_now + daily
assert day_from_now < next_cmd < two_days_from_now
@pytest.mark.parametrize("hour", range(10, 14))
@pytest.mark.parametrize("tz_offset", (14, -14))
def test_command_at_noon_distant_local(self, hour, tz_offset):
"""
Run test_command_at_noon, but with the local timezone
more than 12 hours away from UTC.
"""
with freezegun.freeze_time(f"2020-01-10 {hour:02}:01", tz_offset=tz_offset):
self.test_command_at_noon()
class TestTimezones:
def test_alternate_timezone_west(self):
@ -105,8 +97,7 @@ class TestTimezones:
target_tz = pytz.timezone('US/Eastern')
target_time = datetime.time(9, tzinfo=target_tz)
cmd = schedule.PeriodicCommandFixedDelay.daily_at(
target_time,
target=lambda: None,
target_time, target=lambda: None
)
def naive(dt):
@ -116,3 +107,43 @@ class TestTimezones:
next_ = cmd.next()
assert naive(next_) == datetime.datetime(2018, 3, 11, 9, 0, 0)
assert next_ - cmd == datetime.timedelta(hours=23)
class TestScheduler:
def test_invoke_scheduler(self):
sched = schedule.InvokeScheduler()
target = mock.MagicMock()
cmd = schedule.DelayedCommand.after(0, target)
sched.add(cmd)
sched.run_pending()
target.assert_called_once()
assert not sched.queue
def test_callback_scheduler(self):
callback = mock.MagicMock()
sched = schedule.CallbackScheduler(callback)
target = mock.MagicMock()
cmd = schedule.DelayedCommand.after(0, target)
sched.add(cmd)
sched.run_pending()
callback.assert_called_once_with(target)
def test_periodic_command(self):
sched = schedule.InvokeScheduler()
target = mock.MagicMock()
before = datetime.datetime.utcnow()
cmd = schedule.PeriodicCommand.after(10, target)
sched.add(cmd)
sched.run_pending()
target.assert_not_called()
with freezegun.freeze_time(before + datetime.timedelta(seconds=15)):
sched.run_pending()
assert sched.queue
target.assert_called_once()
with freezegun.freeze_time(before + datetime.timedelta(seconds=25)):
sched.run_pending()
assert target.call_count == 2

View file

@ -0,0 +1,50 @@
import datetime
import time
import contextlib
import os
from unittest import mock
import pytest
from tempora import timing
def test_IntervalGovernor():
"""
IntervalGovernor should prevent a function from being called more than
once per interval.
"""
func_under_test = mock.MagicMock()
# to look like a function, it needs a __name__ attribute
func_under_test.__name__ = 'func_under_test'
interval = datetime.timedelta(seconds=1)
governed = timing.IntervalGovernor(interval)(func_under_test)
governed('a')
governed('b')
governed(3, 'sir')
func_under_test.assert_called_once_with('a')
@pytest.fixture
def alt_tz(monkeypatch):
hasattr(time, 'tzset') or pytest.skip("tzset not available")
@contextlib.contextmanager
def change():
val = 'AEST-10AEDT-11,M10.5.0,M3.5.0'
with monkeypatch.context() as ctx:
ctx.setitem(os.environ, 'TZ', val)
time.tzset()
yield
time.tzset()
return change()
def test_Stopwatch_timezone_change(alt_tz):
"""
The stopwatch should provide a consistent duration even
if the timezone changes.
"""
watch = timing.Stopwatch()
with alt_tz:
assert abs(watch.split().total_seconds()) < 0.1

View file

@ -1,20 +1,13 @@
# -*- coding: utf-8 -*-
from __future__ import unicode_literals, absolute_import
import datetime
import functools
import numbers
import time
import six
import collections.abc
import contextlib
import jaraco.functools
__metaclass__ = type
class Stopwatch:
"""
A simple stopwatch which starts automatically.
@ -40,20 +33,21 @@ class Stopwatch:
... assert isinstance(watch.split(), datetime.timedelta)
In that case, the watch is stopped when the context is exited,
so to read the elapsed time::
so to read the elapsed time:
>>> watch.elapsed
datetime.timedelta(...)
>>> watch.elapsed.seconds
0
"""
def __init__(self):
self.reset()
self.start()
def reset(self):
self.elapsed = datetime.timedelta(0)
if hasattr(self, 'start_time'):
with contextlib.suppress(AttributeError):
del self.start_time
def start(self):
@ -82,7 +76,12 @@ class IntervalGovernor:
"""
Decorate a function to only allow it to be called once per
min_interval. Otherwise, it returns None.
>>> gov = IntervalGovernor(30)
>>> gov.min_interval.total_seconds()
30.0
"""
def __init__(self, min_interval):
if isinstance(min_interval, numbers.Number):
min_interval = datetime.timedelta(seconds=min_interval)
@ -92,13 +91,11 @@ class IntervalGovernor:
def decorate(self, func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
allow = (
not self.last_call
or self.last_call.split() > self.min_interval
)
allow = not self.last_call or self.last_call.split() > self.min_interval
if allow:
self.last_call = Stopwatch()
return func(*args, **kwargs)
return wrapper
__call__ = decorate
@ -115,12 +112,21 @@ class Timer(Stopwatch):
>>> t.expired()
True
"""
def __init__(self, target=float('Inf')):
self.target = self._accept(target)
super(Timer, self).__init__()
def _accept(self, target):
"Accept None or ∞ or datetime or numeric for target"
@staticmethod
def _accept(target):
"""
Accept None or or datetime or numeric for target
>>> Timer._accept(datetime.timedelta(seconds=30))
30.0
>>> Timer._accept(None)
inf
"""
if isinstance(target, datetime.timedelta):
target = target.total_seconds()
@ -134,7 +140,7 @@ class Timer(Stopwatch):
return self.split().total_seconds() > self.target
class BackoffDelay(six.Iterator):
class BackoffDelay(collections.abc.Iterator):
"""
Exponential backoff delay.
@ -231,12 +237,14 @@ class BackoffDelay(six.Iterator):
def limit(n):
return max(0, min(limit_, n))
self.limit = limit
if isinstance(jitter, numbers.Number):
jitter_ = jitter
def jitter():
return jitter_
self.jitter = jitter
def __call__(self):

View file

@ -31,9 +31,6 @@ __all__ = ['now', 'fromtimestamp', 'datetime', 'time']
now = functools.partial(std.datetime.now, std.timezone.utc)
fromtimestamp = functools.partial(
std.datetime.fromtimestamp,
tz=std.timezone.utc,
)
fromtimestamp = functools.partial(std.datetime.fromtimestamp, tz=std.timezone.utc)
datetime = functools.partial(std.datetime, tzinfo=std.timezone.utc)
time = functools.partial(std.time, tzinfo=std.timezone.utc)