Update apscheduler to version 3.6.3

This commit is contained in:
JonnyWong16 2019-11-23 14:38:11 -08:00
parent e5a3d534b2
commit 244a3e5be3
13 changed files with 141 additions and 64 deletions

View file

@ -3,7 +3,7 @@ __all__ = ('EVENT_SCHEDULER_STARTED', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_SCHEDUL
'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', 'EVENT_ALL_JOBS_REMOVED',
'EVENT_JOB_ADDED', 'EVENT_JOB_REMOVED', 'EVENT_JOB_MODIFIED', 'EVENT_JOB_EXECUTED',
'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES',
'SchedulerEvent', 'JobEvent', 'JobExecutionEvent')
'SchedulerEvent', 'JobEvent', 'JobExecutionEvent', 'JobSubmissionEvent')
EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0

View file

@ -3,12 +3,11 @@ from __future__ import absolute_import
import sys
from apscheduler.executors.base import BaseExecutor, run_job
from apscheduler.util import iscoroutinefunction_partial
try:
from asyncio import iscoroutinefunction
from apscheduler.executors.base_py3 import run_coroutine_job
except ImportError:
from trollius import iscoroutinefunction
run_coroutine_job = None
@ -46,7 +45,7 @@ class AsyncIOExecutor(BaseExecutor):
else:
self._run_job_success(job.id, events)
if iscoroutinefunction(job.func):
if iscoroutinefunction_partial(job.func):
if run_coroutine_job is not None:
coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
f = self._eventloop.create_task(coro)

View file

@ -8,10 +8,10 @@ from tornado.gen import convert_yielded
from apscheduler.executors.base import BaseExecutor, run_job
try:
from inspect import iscoroutinefunction
from apscheduler.executors.base_py3 import run_coroutine_job
from apscheduler.util import iscoroutinefunction_partial
except ImportError:
def iscoroutinefunction(func):
def iscoroutinefunction_partial(func):
return False
@ -44,7 +44,7 @@ class TornadoExecutor(BaseExecutor):
else:
self._run_job_success(job.id, events)
if iscoroutinefunction(job.func):
if iscoroutinefunction_partial(job.func):
f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name)
else:
f = self.executor.submit(run_job, job, job._jobstore_alias, run_times,

View file

@ -1,4 +1,4 @@
from collections import Iterable, Mapping
from inspect import ismethod, isclass
from uuid import uuid4
import six
@ -8,6 +8,11 @@ from apscheduler.util import (
ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args,
convert_to_datetime)
try:
from collections.abc import Iterable, Mapping
except ImportError:
from collections import Iterable, Mapping
class Job(object):
"""
@ -235,13 +240,20 @@ class Job(object):
'be determined. Consider giving a textual reference (module:function name) '
'instead.' % (self.func,))
# Instance methods cannot survive serialization as-is, so store the "self" argument
# explicitly
if ismethod(self.func) and not isclass(self.func.__self__):
args = (self.func.__self__,) + tuple(self.args)
else:
args = self.args
return {
'version': 1,
'id': self.id,
'func': self.func_ref,
'trigger': self.trigger,
'executor': self.executor,
'args': self.args,
'args': args,
'kwargs': self.kwargs,
'name': self.name,
'misfire_grace_time': self.misfire_grace_time,

View file

@ -14,7 +14,7 @@ except ImportError: # pragma: nocover
import pickle
try:
from redis import StrictRedis
from redis import Redis
except ImportError: # pragma: nocover
raise ImportError('RedisJobStore requires redis installed')
@ -47,7 +47,7 @@ class RedisJobStore(BaseJobStore):
self.pickle_protocol = pickle_protocol
self.jobs_key = jobs_key
self.run_times_key = run_times_key
self.redis = StrictRedis(db=int(db), **connect_args)
self.redis = Redis(db=int(db), **connect_args)
def lookup_job(self, job_id):
job_state = self.redis.hget(self.jobs_key, job_id)
@ -81,7 +81,9 @@ class RedisJobStore(BaseJobStore):
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
self.pickle_protocol))
if job.next_run_time:
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
pipe.zadd(self.run_times_key,
{job.id: datetime_to_utc_timestamp(job.next_run_time)})
pipe.execute()
def update_job(self, job):
@ -92,9 +94,11 @@ class RedisJobStore(BaseJobStore):
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
self.pickle_protocol))
if job.next_run_time:
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
pipe.zadd(self.run_times_key,
{job.id: datetime_to_utc_timestamp(job.next_run_time)})
else:
pipe.zrem(self.run_times_key, job.id)
pipe.execute()
def remove_job(self, job_id):

View file

@ -10,7 +10,7 @@ except ImportError: # pragma: nocover
import pickle
try:
import rethinkdb as r
from rethinkdb import RethinkDB
except ImportError: # pragma: nocover
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
@ -40,10 +40,12 @@ class RethinkDBJobStore(BaseJobStore):
raise ValueError('The "table" parameter must not be empty')
self.database = database
self.table = table
self.table_name = table
self.table = None
self.client = client
self.pickle_protocol = pickle_protocol
self.connect_args = connect_args
self.r = RethinkDB()
self.conn = None
def start(self, scheduler, alias):
@ -52,31 +54,31 @@ class RethinkDBJobStore(BaseJobStore):
if self.client:
self.conn = maybe_ref(self.client)
else:
self.conn = r.connect(db=self.database, **self.connect_args)
self.conn = self.r.connect(db=self.database, **self.connect_args)
if self.database not in r.db_list().run(self.conn):
r.db_create(self.database).run(self.conn)
if self.database not in self.r.db_list().run(self.conn):
self.r.db_create(self.database).run(self.conn)
if self.table not in r.table_list().run(self.conn):
r.table_create(self.table).run(self.conn)
if self.table_name not in self.r.table_list().run(self.conn):
self.r.table_create(self.table_name).run(self.conn)
if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
r.table(self.table).index_create('next_run_time').run(self.conn)
if 'next_run_time' not in self.r.table(self.table_name).index_list().run(self.conn):
self.r.table(self.table_name).index_create('next_run_time').run(self.conn)
self.table = r.db(self.database).table(self.table)
self.table = self.r.db(self.database).table(self.table_name)
def lookup_job(self, job_id):
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
return self._reconstitute_job(results[0]['job_state']) if results else None
def get_due_jobs(self, now):
return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
return self._get_jobs(self.r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
def get_next_run_time(self):
results = list(
self.table
.filter(r.row['next_run_time'] != None) # flake8: noqa
.order_by(r.asc('next_run_time'))
.filter(self.r.row['next_run_time'] != None) # noqa
.order_by(self.r.asc('next_run_time'))
.map(lambda x: x['next_run_time'])
.limit(1)
.run(self.conn)
@ -92,7 +94,7 @@ class RethinkDBJobStore(BaseJobStore):
job_dict = {
'id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.insert(job_dict).run(self.conn)
if results['errors'] > 0:
@ -101,7 +103,7 @@ class RethinkDBJobStore(BaseJobStore):
def update_job(self, job):
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
'job_state': self.r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
results = self.table.get_all(job.id).update(changes).run(self.conn)
skipped = False in map(lambda x: results[x] == 0, results.keys())
@ -130,20 +132,20 @@ class RethinkDBJobStore(BaseJobStore):
def _get_jobs(self, predicate=None):
jobs = []
failed_job_ids = []
query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if
predicate else self.table)
query = (self.table.filter(self.r.row['next_run_time'] != None).filter(predicate) # noqa
if predicate else self.table)
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
for document in query.run(self.conn):
try:
jobs.append(self._reconstitute_job(document['job_state']))
except:
except Exception:
self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
failed_job_ids.append(document['id'])
# Remove all the jobs we failed to restore
if failed_job_ids:
r.expr(failed_job_ids).for_each(
self.r.expr(failed_job_ids).for_each(
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
return jobs

View file

@ -106,7 +106,7 @@ class SQLAlchemyJobStore(BaseJobStore):
}).where(self.jobs_t.c.id == job.id)
result = self.engine.execute(update)
if result.rowcount == 0:
raise JobLookupError(id)
raise JobLookupError(job.id)
def remove_job(self, job_id):
delete = self.jobs_t.delete().where(self.jobs_t.c.id == job_id)

View file

@ -1,7 +1,6 @@
from __future__ import print_function
from abc import ABCMeta, abstractmethod
from collections import MutableMapping
from threading import RLock
from datetime import datetime, timedelta
from logging import getLogger
@ -19,13 +18,19 @@ from apscheduler.jobstores.base import ConflictingIdError, JobLookupError, BaseJ
from apscheduler.jobstores.memory import MemoryJobStore
from apscheduler.job import Job
from apscheduler.triggers.base import BaseTrigger
from apscheduler.util import asbool, asint, astimezone, maybe_ref, timedelta_seconds, undefined
from apscheduler.util import (
asbool, asint, astimezone, maybe_ref, timedelta_seconds, undefined, TIMEOUT_MAX)
from apscheduler.events import (
SchedulerEvent, JobEvent, JobSubmissionEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN,
EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED,
EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED,
EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED)
try:
from collections.abc import MutableMapping
except ImportError:
from collections import MutableMapping
#: constant indicating a scheduler's stopped state
STATE_STOPPED = 0
#: constant indicating a scheduler's running state (started and processing jobs)
@ -126,11 +131,14 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param bool paused: if ``True``, don't start job processing until :meth:`resume` is called
:raises SchedulerAlreadyRunningError: if the scheduler is already running
:raises RuntimeError: if running under uWSGI with threads disabled
"""
if self.state != STATE_STOPPED:
raise SchedulerAlreadyRunningError
self._check_uwsgi()
with self._executors_lock:
# Create a default executor if nothing else is configured
if 'default' not in self._executors:
@ -177,12 +185,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.state = STATE_STOPPED
with self._jobstores_lock, self._executors_lock:
# Shut down all executors
with self._executors_lock:
for executor in six.itervalues(self._executors):
executor.shutdown(wait)
# Shut down all job stores
with self._jobstores_lock:
for jobstore in six.itervalues(self._jobstores):
jobstore.shutdown()
@ -546,7 +555,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
"""
if pending is not None:
warnings.warn('The "pending" option is deprecated -- get_jobs() always returns '
'pending jobs if the scheduler has been started and scheduled jobs '
'scheduled jobs if the scheduler has been started and pending jobs '
'otherwise', DeprecationWarning)
with self._jobstores_lock:
@ -589,7 +598,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
"""
jobstore_alias = None
with self._jobstores_lock:
if self.state == STATE_STOPPED:
# Check if the job is among the pending jobs
if self.state == STATE_STOPPED:
for i, (job, alias, replace_existing) in enumerate(self._pending_jobs):
@ -824,6 +832,14 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
except BaseException:
self._logger.exception('Error notifying listener')
def _check_uwsgi(self):
"""Check if we're running under uWSGI with threads disabled."""
uwsgi_module = sys.modules.get('uwsgi')
if not getattr(uwsgi_module, 'has_threads', True):
raise RuntimeError('The scheduler seems to be running under uWSGI, but threads have '
'been disabled. You must run uWSGI with the --enable-threads '
'option for the scheduler to work.')
def _real_add_job(self, job, jobstore_alias, replace_existing):
"""
:param Job job: the job to add
@ -999,7 +1015,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
wait_seconds = None
self._logger.debug('No jobs; waiting until a job is added')
else:
wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0)
wait_seconds = min(max(timedelta_seconds(next_wakeup_time - now), 0), TIMEOUT_MAX)
self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time,
wait_seconds)

View file

@ -9,7 +9,7 @@ except (ImportError, RuntimeError): # pragma: nocover
from PyQt4.QtCore import QObject, QTimer
except ImportError:
try:
from PySide.QtCore import QObject, QTimer # flake8: noqa
from PySide.QtCore import QObject, QTimer # noqa
except ImportError:
raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed')
@ -26,7 +26,8 @@ class QtScheduler(BaseScheduler):
def _start_timer(self, wait_seconds):
self._stop_timer()
if wait_seconds is not None:
self._timer = QTimer.singleShot(wait_seconds * 1000, self._process_jobs)
wait_time = min(wait_seconds * 1000, 2147483647)
self._timer = QTimer.singleShot(wait_time, self._process_jobs)
def _stop_timer(self):
if self._timer:

View file

@ -192,9 +192,8 @@ class CronTrigger(BaseTrigger):
return None
if fieldnum >= 0:
if self.jitter is not None:
next_date = self._apply_jitter(next_date, self.jitter, now)
return next_date
return min(next_date, self.end_date) if self.end_date else next_date
def __getstate__(self):
return {

View file

@ -9,7 +9,7 @@ __all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression',
'WeekdayPositionExpression', 'LastDayOfMonthExpression')
WEEKDAYS = ['sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat']
WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun']
MONTHS = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec']

View file

@ -104,7 +104,7 @@ class DayOfWeekField(BaseField):
COMPILERS = BaseField.COMPILERS + [WeekdayRangeExpression]
def get_value(self, dateval):
return dateval.isoweekday() % 7
return dateval.weekday()
class MonthField(BaseField):

View file

@ -1,12 +1,14 @@
"""This module contains several handy functions primarily meant for internal use."""
from __future__ import division
from datetime import date, datetime, time, timedelta, tzinfo
from calendar import timegm
import re
from functools import partial
from inspect import isclass, ismethod
import re
from pytz import timezone, utc
from pytz import timezone, utc, FixedOffset
import six
try:
@ -19,9 +21,19 @@ try:
except ImportError:
TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows
try:
from asyncio import iscoroutinefunction
except ImportError:
try:
from trollius import iscoroutinefunction
except ImportError:
def iscoroutinefunction(func):
return False
__all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp',
'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name',
'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args')
'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args',
'TIMEOUT_MAX')
class _Undefined(object):
@ -92,8 +104,9 @@ def astimezone(obj):
_DATE_REGEX = re.compile(
r'(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})'
r'(?: (?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
r'(?:\.(?P<microsecond>\d{1,6}))?)?')
r'(?:[ T](?P<hour>\d{1,2}):(?P<minute>\d{1,2}):(?P<second>\d{1,2})'
r'(?:\.(?P<microsecond>\d{1,6}))?'
r'(?P<timezone>Z|[+-]\d\d:\d\d)?)?$')
def convert_to_datetime(input, tz, arg_name):
@ -105,7 +118,9 @@ def convert_to_datetime(input, tz, arg_name):
If the input is a string, it is parsed as a datetime with the given timezone.
Date strings are accepted in three different forms: date only (Y-m-d), date with time
(Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro).
(Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). Additionally you can
override the time zone by giving a specific offset in the format specified by ISO 8601:
Z (UTC), +HH:MM or -HH:MM.
:param str|datetime input: the datetime or string to convert to a timezone aware datetime
:param datetime.tzinfo tz: timezone to interpret ``input`` in
@ -123,8 +138,17 @@ def convert_to_datetime(input, tz, arg_name):
m = _DATE_REGEX.match(input)
if not m:
raise ValueError('Invalid date string')
values = [(k, int(v or 0)) for k, v in m.groupdict().items()]
values = dict(values)
values = m.groupdict()
tzname = values.pop('timezone')
if tzname == 'Z':
tz = utc
elif tzname:
hours, minutes = (int(x) for x in tzname[1:].split(':'))
sign = 1 if tzname[0] == '+' else -1
tz = FixedOffset(sign * (hours * 60 + minutes))
values = {k: int(v or 0) for k, v in values.items()}
datetime_ = datetime(**values)
else:
raise TypeError('Unsupported type for %s: %s' % (arg_name, input.__class__.__name__))
@ -210,7 +234,7 @@ def get_callable_name(func):
# class methods, bound and unbound methods
f_self = getattr(func, '__self__', None) or getattr(func, 'im_self', None)
if f_self and hasattr(func, '__name__'):
f_class = f_self if isinstance(f_self, type) else f_self.__class__
f_class = f_self if isclass(f_self) else f_self.__class__
else:
f_class = getattr(func, 'im_class', None)
@ -248,7 +272,18 @@ def obj_to_ref(obj):
if '<locals>' in name:
raise ValueError('Cannot create a reference to a nested function')
return '%s:%s' % (obj.__module__, name)
if ismethod(obj):
if hasattr(obj, 'im_self') and obj.im_self:
# bound method
module = obj.im_self.__module__
elif hasattr(obj, 'im_class') and obj.im_class:
# unbound method
module = obj.im_class.__module__
else:
module = obj.__module__
else:
module = obj.__module__
return '%s:%s' % (module, name)
def ref_to_obj(ref):
@ -383,3 +418,12 @@ def check_callable_args(func, args, kwargs):
raise ValueError(
'The target callable does not accept the following keyword arguments: %s' %
', '.join(unmatched_kwargs))
def iscoroutinefunction_partial(f):
while isinstance(f, partial):
f = f.func
# The asyncio version of iscoroutinefunction includes testing for @coroutine
# decorations vs. the inspect version which does not.
return iscoroutinefunction(f)