diff --git a/lib/apscheduler/__init__.py b/lib/apscheduler/__init__.py index 283002b7..968169a9 100644 --- a/lib/apscheduler/__init__.py +++ b/lib/apscheduler/__init__.py @@ -1,5 +1,10 @@ -version_info = (3, 0, 1) -version = '3.0.1' -release = '3.0.1' +from pkg_resources import get_distribution, DistributionNotFound -__version__ = release # PEP 396 +try: + release = get_distribution('APScheduler').version.split('-')[0] +except DistributionNotFound: + release = '3.5.0' + +version_info = tuple(int(x) if x.isdigit() else x for x in release.split('.')) +version = __version__ = '.'.join(str(x) for x in version_info[:3]) +del get_distribution, DistributionNotFound diff --git a/lib/apscheduler/events.py b/lib/apscheduler/events.py index 9418263d..890763eb 100644 --- a/lib/apscheduler/events.py +++ b/lib/apscheduler/events.py @@ -1,25 +1,33 @@ -__all__ = ('EVENT_SCHEDULER_START', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_EXECUTOR_ADDED', 'EVENT_EXECUTOR_REMOVED', - 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', 'EVENT_ALL_JOBS_REMOVED', 'EVENT_JOB_ADDED', - 'EVENT_JOB_REMOVED', 'EVENT_JOB_MODIFIED', 'EVENT_JOB_EXECUTED', 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', +__all__ = ('EVENT_SCHEDULER_STARTED', 'EVENT_SCHEDULER_SHUTDOWN', 'EVENT_SCHEDULER_PAUSED', + 'EVENT_SCHEDULER_RESUMED', 'EVENT_EXECUTOR_ADDED', 'EVENT_EXECUTOR_REMOVED', + 'EVENT_JOBSTORE_ADDED', 'EVENT_JOBSTORE_REMOVED', 'EVENT_ALL_JOBS_REMOVED', + 'EVENT_JOB_ADDED', 'EVENT_JOB_REMOVED', 'EVENT_JOB_MODIFIED', 'EVENT_JOB_EXECUTED', + 'EVENT_JOB_ERROR', 'EVENT_JOB_MISSED', 'EVENT_JOB_SUBMITTED', 'EVENT_JOB_MAX_INSTANCES', 'SchedulerEvent', 'JobEvent', 'JobExecutionEvent') -EVENT_SCHEDULER_START = 1 -EVENT_SCHEDULER_SHUTDOWN = 2 -EVENT_EXECUTOR_ADDED = 4 -EVENT_EXECUTOR_REMOVED = 8 -EVENT_JOBSTORE_ADDED = 16 -EVENT_JOBSTORE_REMOVED = 32 -EVENT_ALL_JOBS_REMOVED = 64 -EVENT_JOB_ADDED = 128 -EVENT_JOB_REMOVED = 256 -EVENT_JOB_MODIFIED = 512 -EVENT_JOB_EXECUTED = 1024 -EVENT_JOB_ERROR = 2048 -EVENT_JOB_MISSED = 4096 -EVENT_ALL = (EVENT_SCHEDULER_START | EVENT_SCHEDULER_SHUTDOWN | EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED | +EVENT_SCHEDULER_STARTED = EVENT_SCHEDULER_START = 2 ** 0 +EVENT_SCHEDULER_SHUTDOWN = 2 ** 1 +EVENT_SCHEDULER_PAUSED = 2 ** 2 +EVENT_SCHEDULER_RESUMED = 2 ** 3 +EVENT_EXECUTOR_ADDED = 2 ** 4 +EVENT_EXECUTOR_REMOVED = 2 ** 5 +EVENT_JOBSTORE_ADDED = 2 ** 6 +EVENT_JOBSTORE_REMOVED = 2 ** 7 +EVENT_ALL_JOBS_REMOVED = 2 ** 8 +EVENT_JOB_ADDED = 2 ** 9 +EVENT_JOB_REMOVED = 2 ** 10 +EVENT_JOB_MODIFIED = 2 ** 11 +EVENT_JOB_EXECUTED = 2 ** 12 +EVENT_JOB_ERROR = 2 ** 13 +EVENT_JOB_MISSED = 2 ** 14 +EVENT_JOB_SUBMITTED = 2 ** 15 +EVENT_JOB_MAX_INSTANCES = 2 ** 16 +EVENT_ALL = (EVENT_SCHEDULER_STARTED | EVENT_SCHEDULER_SHUTDOWN | EVENT_SCHEDULER_PAUSED | + EVENT_SCHEDULER_RESUMED | EVENT_EXECUTOR_ADDED | EVENT_EXECUTOR_REMOVED | + EVENT_JOBSTORE_ADDED | EVENT_JOBSTORE_REMOVED | EVENT_ALL_JOBS_REMOVED | EVENT_JOB_ADDED | EVENT_JOB_REMOVED | EVENT_JOB_MODIFIED | EVENT_JOB_EXECUTED | - EVENT_JOB_ERROR | EVENT_JOB_MISSED) + EVENT_JOB_ERROR | EVENT_JOB_MISSED | EVENT_JOB_SUBMITTED | EVENT_JOB_MAX_INSTANCES) class SchedulerEvent(object): @@ -55,9 +63,21 @@ class JobEvent(SchedulerEvent): self.jobstore = jobstore +class JobSubmissionEvent(JobEvent): + """ + An event that concerns the submission of a job to its executor. + + :ivar scheduled_run_times: a list of datetimes when the job was intended to run + """ + + def __init__(self, code, job_id, jobstore, scheduled_run_times): + super(JobSubmissionEvent, self).__init__(code, job_id, jobstore) + self.scheduled_run_times = scheduled_run_times + + class JobExecutionEvent(JobEvent): """ - An event that concerns the execution of individual jobs. + An event that concerns the running of a job within its executor. :ivar scheduled_run_time: the time when the job was scheduled to be run :ivar retval: the return value of the successfully executed job @@ -65,7 +85,8 @@ class JobExecutionEvent(JobEvent): :ivar traceback: a formatted traceback for the exception """ - def __init__(self, code, job_id, jobstore, scheduled_run_time, retval=None, exception=None, traceback=None): + def __init__(self, code, job_id, jobstore, scheduled_run_time, retval=None, exception=None, + traceback=None): super(JobExecutionEvent, self).__init__(code, job_id, jobstore) self.scheduled_run_time = scheduled_run_time self.retval = retval diff --git a/lib/apscheduler/executors/asyncio.py b/lib/apscheduler/executors/asyncio.py index fade99f8..5139622d 100644 --- a/lib/apscheduler/executors/asyncio.py +++ b/lib/apscheduler/executors/asyncio.py @@ -1,28 +1,60 @@ from __future__ import absolute_import + import sys from apscheduler.executors.base import BaseExecutor, run_job +try: + from asyncio import iscoroutinefunction + from apscheduler.executors.base_py3 import run_coroutine_job +except ImportError: + from trollius import iscoroutinefunction + run_coroutine_job = None + class AsyncIOExecutor(BaseExecutor): """ Runs jobs in the default executor of the event loop. + If the job function is a native coroutine function, it is scheduled to be run directly in the + event loop as soon as possible. All other functions are run in the event loop's default + executor which is usually a thread pool. + Plugin alias: ``asyncio`` """ def start(self, scheduler, alias): super(AsyncIOExecutor, self).start(scheduler, alias) self._eventloop = scheduler._eventloop + self._pending_futures = set() + + def shutdown(self, wait=True): + # There is no way to honor wait=True without converting this method into a coroutine method + for f in self._pending_futures: + if not f.done(): + f.cancel() + + self._pending_futures.clear() def _do_submit_job(self, job, run_times): def callback(f): + self._pending_futures.discard(f) try: events = f.result() - except: + except BaseException: self._run_job_error(job.id, *sys.exc_info()[1:]) else: self._run_job_success(job.id, events) - f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times, self._logger.name) + if iscoroutinefunction(job.func): + if run_coroutine_job is not None: + coro = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) + f = self._eventloop.create_task(coro) + else: + raise Exception('Executing coroutine based jobs is not supported with Trollius') + else: + f = self._eventloop.run_in_executor(None, run_job, job, job._jobstore_alias, run_times, + self._logger.name) + f.add_done_callback(callback) + self._pending_futures.add(f) diff --git a/lib/apscheduler/executors/base.py b/lib/apscheduler/executors/base.py index 5a0a19eb..4c09fc11 100644 --- a/lib/apscheduler/executors/base.py +++ b/lib/apscheduler/executors/base.py @@ -8,13 +8,15 @@ import sys from pytz import utc import six -from apscheduler.events import JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED +from apscheduler.events import ( + JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED) class MaxInstancesReachedError(Exception): def __init__(self, job): super(MaxInstancesReachedError, self).__init__( - 'Job "%s" has already reached its maximum number of instances (%d)' % (job.id, job.max_instances)) + 'Job "%s" has already reached its maximum number of instances (%d)' % + (job.id, job.max_instances)) class BaseExecutor(six.with_metaclass(ABCMeta, object)): @@ -30,13 +32,14 @@ class BaseExecutor(six.with_metaclass(ABCMeta, object)): def start(self, scheduler, alias): """ - Called by the scheduler when the scheduler is being started or when the executor is being added to an already - running scheduler. + Called by the scheduler when the scheduler is being started or when the executor is being + added to an already running scheduler. - :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting this executor + :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting + this executor :param str|unicode alias: alias of this executor as it was assigned to the scheduler - """ + """ self._scheduler = scheduler self._lock = scheduler._create_lock() self._logger = logging.getLogger('apscheduler.executors.%s' % alias) @@ -45,7 +48,8 @@ class BaseExecutor(six.with_metaclass(ABCMeta, object)): """ Shuts down this executor. - :param bool wait: ``True`` to wait until all submitted jobs have been executed + :param bool wait: ``True`` to wait until all submitted jobs + have been executed """ def submit_job(self, job, run_times): @@ -53,10 +57,12 @@ class BaseExecutor(six.with_metaclass(ABCMeta, object)): Submits job for execution. :param Job job: job to execute - :param list[datetime] run_times: list of datetimes specifying when the job should have been run - :raises MaxInstancesReachedError: if the maximum number of allowed instances for this job has been reached - """ + :param list[datetime] run_times: list of datetimes specifying + when the job should have been run + :raises MaxInstancesReachedError: if the maximum number of + allowed instances for this job has been reached + """ assert self._lock is not None, 'This executor has not been started yet' with self._lock: if self._instances[job.id] >= job.max_instances: @@ -70,50 +76,71 @@ class BaseExecutor(six.with_metaclass(ABCMeta, object)): """Performs the actual task of scheduling `run_job` to be called.""" def _run_job_success(self, job_id, events): - """Called by the executor with the list of generated events when `run_job` has been successfully called.""" + """ + Called by the executor with the list of generated events when :func:`run_job` has been + successfully called. + """ with self._lock: self._instances[job_id] -= 1 + if self._instances[job_id] == 0: + del self._instances[job_id] for event in events: self._scheduler._dispatch_event(event) def _run_job_error(self, job_id, exc, traceback=None): - """Called by the executor with the exception if there is an error calling `run_job`.""" - + """Called by the executor with the exception if there is an error calling `run_job`.""" with self._lock: self._instances[job_id] -= 1 + if self._instances[job_id] == 0: + del self._instances[job_id] exc_info = (exc.__class__, exc, traceback) self._logger.error('Error running job %s', job_id, exc_info=exc_info) def run_job(job, jobstore_alias, run_times, logger_name): - """Called by executors to run the job. Returns a list of scheduler events to be dispatched by the scheduler.""" + """ + Called by executors to run the job. Returns a list of scheduler events to be dispatched by the + scheduler. + """ events = [] logger = logging.getLogger(logger_name) for run_time in run_times: - # See if the job missed its run time window, and handle possible misfires accordingly + # See if the job missed its run time window, and handle + # possible misfires accordingly if job.misfire_grace_time is not None: difference = datetime.now(utc) - run_time grace_time = timedelta(seconds=job.misfire_grace_time) if difference > grace_time: - events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, run_time)) + events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, + run_time)) logger.warning('Run time of job "%s" was missed by %s', job, difference) continue logger.info('Running job "%s" (scheduled at %s)', job, run_time) try: retval = job.func(*job.args, **job.kwargs) - except: + except BaseException: exc, tb = sys.exc_info()[1:] formatted_tb = ''.join(format_tb(tb)) - events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, exception=exc, - traceback=formatted_tb)) + events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, + exception=exc, traceback=formatted_tb)) logger.exception('Job "%s" raised an exception', job) + + # This is to prevent cyclic references that would lead to memory leaks + if six.PY2: + sys.exc_clear() + del tb + else: + import traceback + traceback.clear_frames(tb) + del tb else: - events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval)) + events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, + retval=retval)) logger.info('Job "%s" executed successfully', job) return events diff --git a/lib/apscheduler/executors/base_py3.py b/lib/apscheduler/executors/base_py3.py new file mode 100644 index 00000000..61abd842 --- /dev/null +++ b/lib/apscheduler/executors/base_py3.py @@ -0,0 +1,41 @@ +import logging +import sys +from datetime import datetime, timedelta +from traceback import format_tb + +from pytz import utc + +from apscheduler.events import ( + JobExecutionEvent, EVENT_JOB_MISSED, EVENT_JOB_ERROR, EVENT_JOB_EXECUTED) + + +async def run_coroutine_job(job, jobstore_alias, run_times, logger_name): + """Coroutine version of run_job().""" + events = [] + logger = logging.getLogger(logger_name) + for run_time in run_times: + # See if the job missed its run time window, and handle possible misfires accordingly + if job.misfire_grace_time is not None: + difference = datetime.now(utc) - run_time + grace_time = timedelta(seconds=job.misfire_grace_time) + if difference > grace_time: + events.append(JobExecutionEvent(EVENT_JOB_MISSED, job.id, jobstore_alias, + run_time)) + logger.warning('Run time of job "%s" was missed by %s', job, difference) + continue + + logger.info('Running job "%s" (scheduled at %s)', job, run_time) + try: + retval = await job.func(*job.args, **job.kwargs) + except BaseException: + exc, tb = sys.exc_info()[1:] + formatted_tb = ''.join(format_tb(tb)) + events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, + exception=exc, traceback=formatted_tb)) + logger.exception('Job "%s" raised an exception', job) + else: + events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, + retval=retval)) + logger.info('Job "%s" executed successfully', job) + + return events diff --git a/lib/apscheduler/executors/debug.py b/lib/apscheduler/executors/debug.py index 1f6f6b1a..ac739aeb 100644 --- a/lib/apscheduler/executors/debug.py +++ b/lib/apscheduler/executors/debug.py @@ -5,7 +5,8 @@ from apscheduler.executors.base import BaseExecutor, run_job class DebugExecutor(BaseExecutor): """ - A special executor that executes the target callable directly instead of deferring it to a thread or process. + A special executor that executes the target callable directly instead of deferring it to a + thread or process. Plugin alias: ``debug`` """ @@ -13,7 +14,7 @@ class DebugExecutor(BaseExecutor): def _do_submit_job(self, job, run_times): try: events = run_job(job, job._jobstore_alias, run_times, self._logger.name) - except: + except BaseException: self._run_job_error(job.id, *sys.exc_info()[1:]) else: self._run_job_success(job.id, events) diff --git a/lib/apscheduler/executors/gevent.py b/lib/apscheduler/executors/gevent.py index 9f4db2fc..1235bb6e 100644 --- a/lib/apscheduler/executors/gevent.py +++ b/lib/apscheduler/executors/gevent.py @@ -21,9 +21,10 @@ class GeventExecutor(BaseExecutor): def callback(greenlet): try: events = greenlet.get() - except: + except BaseException: self._run_job_error(job.id, *sys.exc_info()[1:]) else: self._run_job_success(job.id, events) - gevent.spawn(run_job, job, job._jobstore_alias, run_times, self._logger.name).link(callback) + gevent.spawn(run_job, job, job._jobstore_alias, run_times, self._logger.name).\ + link(callback) diff --git a/lib/apscheduler/executors/tornado.py b/lib/apscheduler/executors/tornado.py new file mode 100644 index 00000000..a4696ce7 --- /dev/null +++ b/lib/apscheduler/executors/tornado.py @@ -0,0 +1,54 @@ +from __future__ import absolute_import + +import sys +from concurrent.futures import ThreadPoolExecutor + +from tornado.gen import convert_yielded + +from apscheduler.executors.base import BaseExecutor, run_job + +try: + from inspect import iscoroutinefunction + from apscheduler.executors.base_py3 import run_coroutine_job +except ImportError: + def iscoroutinefunction(func): + return False + + +class TornadoExecutor(BaseExecutor): + """ + Runs jobs either in a thread pool or directly on the I/O loop. + + If the job function is a native coroutine function, it is scheduled to be run directly in the + I/O loop as soon as possible. All other functions are run in a thread pool. + + Plugin alias: ``tornado`` + + :param int max_workers: maximum number of worker threads in the thread pool + """ + + def __init__(self, max_workers=10): + super(TornadoExecutor, self).__init__() + self.executor = ThreadPoolExecutor(max_workers) + + def start(self, scheduler, alias): + super(TornadoExecutor, self).start(scheduler, alias) + self._ioloop = scheduler._ioloop + + def _do_submit_job(self, job, run_times): + def callback(f): + try: + events = f.result() + except BaseException: + self._run_job_error(job.id, *sys.exc_info()[1:]) + else: + self._run_job_success(job.id, events) + + if iscoroutinefunction(job.func): + f = run_coroutine_job(job, job._jobstore_alias, run_times, self._logger.name) + else: + f = self.executor.submit(run_job, job, job._jobstore_alias, run_times, + self._logger.name) + + f = convert_yielded(f) + f.add_done_callback(callback) diff --git a/lib/apscheduler/executors/twisted.py b/lib/apscheduler/executors/twisted.py index 29217221..c7bcf647 100644 --- a/lib/apscheduler/executors/twisted.py +++ b/lib/apscheduler/executors/twisted.py @@ -21,5 +21,5 @@ class TwistedExecutor(BaseExecutor): else: self._run_job_error(job.id, result.value, result.tb) - self._reactor.getThreadPool().callInThreadWithCallback(callback, run_job, job, job._jobstore_alias, run_times, - self._logger.name) + self._reactor.getThreadPool().callInThreadWithCallback( + callback, run_job, job, job._jobstore_alias, run_times, self._logger.name) diff --git a/lib/apscheduler/job.py b/lib/apscheduler/job.py index f5639dae..b9c305db 100644 --- a/lib/apscheduler/job.py +++ b/lib/apscheduler/job.py @@ -4,8 +4,9 @@ from uuid import uuid4 import six from apscheduler.triggers.base import BaseTrigger -from apscheduler.util import ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args, \ - convert_to_datetime +from apscheduler.util import ( + ref_to_obj, obj_to_ref, datetime_repr, repr_escape, get_callable_name, check_callable_args, + convert_to_datetime) class Job(object): @@ -21,13 +22,20 @@ class Job(object): :var bool coalesce: whether to only run the job once when several run times are due :var trigger: the trigger object that controls the schedule of this job :var str executor: the name of the executor that will run this job - :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to be late - :var int max_instances: the maximum number of concurrently executing instances allowed for this job + :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to + be late + :var int max_instances: the maximum number of concurrently executing instances allowed for this + job :var datetime.datetime next_run_time: the next scheduled run time of this job + + .. note:: + The ``misfire_grace_time`` has some non-obvious effects on job execution. See the + :ref:`missed-job-executions` section in the documentation for an in-depth explanation. """ - __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref', 'args', 'kwargs', - 'name', 'misfire_grace_time', 'coalesce', 'max_instances', 'next_run_time') + __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref', + 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances', + 'next_run_time') def __init__(self, scheduler, id=None, **kwargs): super(Job, self).__init__() @@ -38,53 +46,69 @@ class Job(object): def modify(self, **changes): """ Makes the given changes to this job and saves it in the associated job store. + Accepted keyword arguments are the same as the variables on this class. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.modify_job` - """ + :return Job: this job instance + + """ self._scheduler.modify_job(self.id, self._jobstore_alias, **changes) + return self def reschedule(self, trigger, **trigger_args): """ Shortcut for switching the trigger on this job. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.reschedule_job` - """ + :return Job: this job instance + + """ self._scheduler.reschedule_job(self.id, self._jobstore_alias, trigger, **trigger_args) + return self def pause(self): """ Temporarily suspend the execution of this job. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.pause_job` - """ + :return Job: this job instance + + """ self._scheduler.pause_job(self.id, self._jobstore_alias) + return self def resume(self): """ Resume the schedule of this job if previously paused. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.resume_job` - """ + :return Job: this job instance + + """ self._scheduler.resume_job(self.id, self._jobstore_alias) + return self def remove(self): """ Unschedules this job and removes it from its associated job store. .. seealso:: :meth:`~apscheduler.schedulers.base.BaseScheduler.remove_job` - """ + """ self._scheduler.remove_job(self.id, self._jobstore_alias) @property def pending(self): - """Returns ``True`` if the referenced job is still waiting to be added to its designated job store.""" + """ + Returns ``True`` if the referenced job is still waiting to be added to its designated job + store. + """ return self._jobstore_alias is None # @@ -97,8 +121,8 @@ class Job(object): :type now: datetime.datetime :rtype: list[datetime.datetime] - """ + """ run_times = [] next_run_time = self.next_run_time while next_run_time and next_run_time <= now: @@ -108,8 +132,11 @@ class Job(object): return run_times def _modify(self, **changes): - """Validates the changes to the Job and makes the modifications if and only if all of them validate.""" + """ + Validates the changes to the Job and makes the modifications if and only if all of them + validate. + """ approved = {} if 'id' in changes: @@ -125,7 +152,7 @@ class Job(object): args = changes.pop('args') if 'args' in changes else self.args kwargs = changes.pop('kwargs') if 'kwargs' in changes else self.kwargs - if isinstance(func, str): + if isinstance(func, six.string_types): func_ref = func func = ref_to_obj(func) elif callable(func): @@ -177,7 +204,8 @@ class Job(object): if 'trigger' in changes: trigger = changes.pop('trigger') if not isinstance(trigger, BaseTrigger): - raise TypeError('Expected a trigger instance, got %s instead' % trigger.__class__.__name__) + raise TypeError('Expected a trigger instance, got %s instead' % + trigger.__class__.__name__) approved['trigger'] = trigger @@ -189,10 +217,12 @@ class Job(object): if 'next_run_time' in changes: value = changes.pop('next_run_time') - approved['next_run_time'] = convert_to_datetime(value, self._scheduler.timezone, 'next_run_time') + approved['next_run_time'] = convert_to_datetime(value, self._scheduler.timezone, + 'next_run_time') if changes: - raise AttributeError('The following are not modifiable attributes of Job: %s' % ', '.join(changes)) + raise AttributeError('The following are not modifiable attributes of Job: %s' % + ', '.join(changes)) for key, value in six.iteritems(approved): setattr(self, key, value) @@ -200,9 +230,10 @@ class Job(object): def __getstate__(self): # Don't allow this Job to be serialized if the function reference could not be determined if not self.func_ref: - raise ValueError('This Job cannot be serialized since the reference to its callable (%r) could not be ' - 'determined. Consider giving a textual reference (module:function name) instead.' % - (self.func,)) + raise ValueError( + 'This Job cannot be serialized since the reference to its callable (%r) could not ' + 'be determined. Consider giving a textual reference (module:function name) ' + 'instead.' % (self.func,)) return { 'version': 1, @@ -221,7 +252,8 @@ class Job(object): def __setstate__(self, state): if state.get('version', 1) > 1: - raise ValueError('Job has version %s, but only version 1 can be handled' % state['version']) + raise ValueError('Job has version %s, but only version 1 can be handled' % + state['version']) self.id = state['id'] self.func_ref = state['func'] @@ -245,8 +277,13 @@ class Job(object): return '' % (repr_escape(self.id), repr_escape(self.name)) def __str__(self): - return '%s (trigger: %s, next run at: %s)' % (repr_escape(self.name), repr_escape(str(self.trigger)), - datetime_repr(self.next_run_time)) + return repr_escape(self.__unicode__()) def __unicode__(self): - return six.u('%s (trigger: %s, next run at: %s)') % (self.name, self.trigger, datetime_repr(self.next_run_time)) + if hasattr(self, 'next_run_time'): + status = ('next run at: ' + datetime_repr(self.next_run_time) if + self.next_run_time else 'paused') + else: + status = 'pending' + + return u'%s (trigger: %s, %s)' % (self.name, self.trigger, status) diff --git a/lib/apscheduler/jobstores/base.py b/lib/apscheduler/jobstores/base.py index d9f7147b..9cff66c4 100644 --- a/lib/apscheduler/jobstores/base.py +++ b/lib/apscheduler/jobstores/base.py @@ -8,23 +8,27 @@ class JobLookupError(KeyError): """Raised when the job store cannot find a job for update or removal.""" def __init__(self, job_id): - super(JobLookupError, self).__init__(six.u('No job by the id of %s was found') % job_id) + super(JobLookupError, self).__init__(u'No job by the id of %s was found' % job_id) class ConflictingIdError(KeyError): """Raised when the uniqueness of job IDs is being violated.""" def __init__(self, job_id): - super(ConflictingIdError, self).__init__(six.u('Job identifier (%s) conflicts with an existing job') % job_id) + super(ConflictingIdError, self).__init__( + u'Job identifier (%s) conflicts with an existing job' % job_id) class TransientJobError(ValueError): - """Raised when an attempt to add transient (with no func_ref) job to a persistent job store is detected.""" + """ + Raised when an attempt to add transient (with no func_ref) job to a persistent job store is + detected. + """ def __init__(self, job_id): super(TransientJobError, self).__init__( - six.u('Job (%s) cannot be added to this job store because a reference to the callable could not be ' - 'determined.') % job_id) + u'Job (%s) cannot be added to this job store because a reference to the callable ' + u'could not be determined.' % job_id) class BaseJobStore(six.with_metaclass(ABCMeta)): @@ -36,10 +40,11 @@ class BaseJobStore(six.with_metaclass(ABCMeta)): def start(self, scheduler, alias): """ - Called by the scheduler when the scheduler is being started or when the job store is being added to an already - running scheduler. + Called by the scheduler when the scheduler is being started or when the job store is being + added to an already running scheduler. - :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting this job store + :param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting + this job store :param str|unicode alias: alias of this job store as it was assigned to the scheduler """ @@ -50,13 +55,22 @@ class BaseJobStore(six.with_metaclass(ABCMeta)): def shutdown(self): """Frees any resources still bound to this job store.""" + def _fix_paused_jobs_sorting(self, jobs): + for i, job in enumerate(jobs): + if job.next_run_time is not None: + if i > 0: + paused_jobs = jobs[:i] + del jobs[:i] + jobs.extend(paused_jobs) + break + @abstractmethod def lookup_job(self, job_id): """ Returns a specific job, or ``None`` if it isn't found.. - The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of the returned job to - point to the scheduler and itself, respectively. + The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of + the returned job to point to the scheduler and itself, respectively. :param str|unicode job_id: identifier of the job :rtype: Job @@ -75,7 +89,8 @@ class BaseJobStore(six.with_metaclass(ABCMeta)): @abstractmethod def get_next_run_time(self): """ - Returns the earliest run time of all the jobs stored in this job store, or ``None`` if there are no active jobs. + Returns the earliest run time of all the jobs stored in this job store, or ``None`` if + there are no active jobs. :rtype: datetime.datetime """ @@ -83,11 +98,12 @@ class BaseJobStore(six.with_metaclass(ABCMeta)): @abstractmethod def get_all_jobs(self): """ - Returns a list of all jobs in this job store. The returned jobs should be sorted by next run time (ascending). - Paused jobs (next_run_time is None) should be sorted last. + Returns a list of all jobs in this job store. + The returned jobs should be sorted by next run time (ascending). + Paused jobs (next_run_time == None) should be sorted last. - The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of the returned jobs to - point to the scheduler and itself, respectively. + The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of + the returned jobs to point to the scheduler and itself, respectively. :rtype: list[Job] """ diff --git a/lib/apscheduler/jobstores/memory.py b/lib/apscheduler/jobstores/memory.py index 645391f3..abfe7c6c 100644 --- a/lib/apscheduler/jobstores/memory.py +++ b/lib/apscheduler/jobstores/memory.py @@ -13,7 +13,8 @@ class MemoryJobStore(BaseJobStore): def __init__(self): super(MemoryJobStore, self).__init__() - self._jobs = [] # list of (job, timestamp), sorted by next_run_time and job id (ascending) + # list of (job, timestamp), sorted by next_run_time and job id (ascending) + self._jobs = [] self._jobs_index = {} # id -> (job, timestamp) lookup table def lookup_job(self, job_id): @@ -80,13 +81,13 @@ class MemoryJobStore(BaseJobStore): def _get_job_index(self, timestamp, job_id): """ - Returns the index of the given job, or if it's not found, the index where the job should be inserted based on - the given timestamp. + Returns the index of the given job, or if it's not found, the index where the job should be + inserted based on the given timestamp. :type timestamp: int :type job_id: str - """ + """ lo, hi = 0, len(self._jobs) timestamp = float('inf') if timestamp is None else timestamp while lo < hi: diff --git a/lib/apscheduler/jobstores/mongodb.py b/lib/apscheduler/jobstores/mongodb.py index ff762f7e..7dbc3b12 100644 --- a/lib/apscheduler/jobstores/mongodb.py +++ b/lib/apscheduler/jobstores/mongodb.py @@ -1,4 +1,5 @@ from __future__ import absolute_import +import warnings from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime @@ -19,16 +20,18 @@ except ImportError: # pragma: nocover class MongoDBJobStore(BaseJobStore): """ - Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to pymongo's `MongoClient + Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to + pymongo's `MongoClient `_. Plugin alias: ``mongodb`` :param str database: database to store jobs in :param str collection: collection to store jobs in - :param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of providing connection - arguments - :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available + :param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available """ def __init__(self, database='apscheduler', collection='jobs', client=None, @@ -42,14 +45,23 @@ class MongoDBJobStore(BaseJobStore): raise ValueError('The "collection" parameter must not be empty') if client: - self.connection = maybe_ref(client) + self.client = maybe_ref(client) else: connect_args.setdefault('w', 1) - self.connection = MongoClient(**connect_args) + self.client = MongoClient(**connect_args) - self.collection = self.connection[database][collection] + self.collection = self.client[database][collection] + + def start(self, scheduler, alias): + super(MongoDBJobStore, self).start(scheduler, alias) self.collection.ensure_index('next_run_time', sparse=True) + @property + def connection(self): + warnings.warn('The "connection" member is deprecated -- use "client" instead', + DeprecationWarning) + return self.client + def lookup_job(self, job_id): document = self.collection.find_one(job_id, ['job_state']) return self._reconstitute_job(document['job_state']) if document else None @@ -59,12 +71,15 @@ class MongoDBJobStore(BaseJobStore): return self._get_jobs({'next_run_time': {'$lte': timestamp}}) def get_next_run_time(self): - document = self.collection.find_one({'next_run_time': {'$ne': None}}, fields=['next_run_time'], + document = self.collection.find_one({'next_run_time': {'$ne': None}}, + projection=['next_run_time'], sort=[('next_run_time', ASCENDING)]) return utc_timestamp_to_datetime(document['next_run_time']) if document else None def get_all_jobs(self): - return self._get_jobs({}) + jobs = self._get_jobs({}) + self._fix_paused_jobs_sorting(jobs) + return jobs def add_job(self, job): try: @@ -83,7 +98,7 @@ class MongoDBJobStore(BaseJobStore): } result = self.collection.update({'_id': job.id}, {'$set': changes}) if result and result['n'] == 0: - raise JobLookupError(id) + raise JobLookupError(job.id) def remove_job(self, job_id): result = self.collection.remove(job_id) @@ -94,7 +109,7 @@ class MongoDBJobStore(BaseJobStore): self.collection.remove() def shutdown(self): - self.connection.disconnect() + self.client.close() def _reconstitute_job(self, job_state): job_state = pickle.loads(job_state) @@ -107,11 +122,13 @@ class MongoDBJobStore(BaseJobStore): def _get_jobs(self, conditions): jobs = [] failed_job_ids = [] - for document in self.collection.find(conditions, ['_id', 'job_state'], sort=[('next_run_time', ASCENDING)]): + for document in self.collection.find(conditions, ['_id', 'job_state'], + sort=[('next_run_time', ASCENDING)]): try: jobs.append(self._reconstitute_job(document['job_state'])) - except: - self._logger.exception('Unable to restore job "%s" -- removing it', document['_id']) + except BaseException: + self._logger.exception('Unable to restore job "%s" -- removing it', + document['_id']) failed_job_ids.append(document['_id']) # Remove all the jobs we failed to restore @@ -121,4 +138,4 @@ class MongoDBJobStore(BaseJobStore): return jobs def __repr__(self): - return '<%s (client=%s)>' % (self.__class__.__name__, self.connection) + return '<%s (client=%s)>' % (self.__class__.__name__, self.client) diff --git a/lib/apscheduler/jobstores/redis.py b/lib/apscheduler/jobstores/redis.py index 2b4ffd52..61f913e9 100644 --- a/lib/apscheduler/jobstores/redis.py +++ b/lib/apscheduler/jobstores/redis.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +from datetime import datetime +from pytz import utc import six from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError @@ -19,14 +21,16 @@ except ImportError: # pragma: nocover class RedisJobStore(BaseJobStore): """ - Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's StrictRedis. + Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's + :class:`~redis.StrictRedis`. Plugin alias: ``redis`` :param int db: the database number to store jobs in :param str jobs_key: key to store jobs in :param str run_times_key: key to store the jobs' run times in - :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available """ def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times', @@ -65,7 +69,8 @@ class RedisJobStore(BaseJobStore): def get_all_jobs(self): job_states = self.redis.hgetall(self.jobs_key) jobs = self._reconstitute_jobs(six.iteritems(job_states)) - return sorted(jobs, key=lambda job: job.next_run_time) + paused_sort_key = datetime(9999, 12, 31, tzinfo=utc) + return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key) def add_job(self, job): if self.redis.hexists(self.jobs_key, job.id): @@ -73,8 +78,10 @@ class RedisJobStore(BaseJobStore): with self.redis.pipeline() as pipe: pipe.multi() - pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol)) - pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) + pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), + self.pickle_protocol)) + if job.next_run_time: + pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) pipe.execute() def update_job(self, job): @@ -82,7 +89,8 @@ class RedisJobStore(BaseJobStore): raise JobLookupError(job.id) with self.redis.pipeline() as pipe: - pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol)) + pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), + self.pickle_protocol)) if job.next_run_time: pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id) else: @@ -121,7 +129,7 @@ class RedisJobStore(BaseJobStore): for job_id, job_state in job_states: try: jobs.append(self._reconstitute_job(job_state)) - except: + except BaseException: self._logger.exception('Unable to restore job "%s" -- removing it', job_id) failed_job_ids.append(job_id) diff --git a/lib/apscheduler/jobstores/rethinkdb.py b/lib/apscheduler/jobstores/rethinkdb.py new file mode 100644 index 00000000..2185c6cc --- /dev/null +++ b/lib/apscheduler/jobstores/rethinkdb.py @@ -0,0 +1,153 @@ +from __future__ import absolute_import + +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + import rethinkdb as r +except ImportError: # pragma: nocover + raise ImportError('RethinkDBJobStore requires rethinkdb installed') + + +class RethinkDBJobStore(BaseJobStore): + """ + Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to + rethinkdb's `RethinkdbClient `_. + + Plugin alias: ``rethinkdb`` + + :param str database: database to store jobs in + :param str collection: collection to store jobs in + :param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing + connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__(self, database='apscheduler', table='jobs', client=None, + pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): + super(RethinkDBJobStore, self).__init__() + + if not database: + raise ValueError('The "database" parameter must not be empty') + if not table: + raise ValueError('The "table" parameter must not be empty') + + self.database = database + self.table = table + self.client = client + self.pickle_protocol = pickle_protocol + self.connect_args = connect_args + self.conn = None + + def start(self, scheduler, alias): + super(RethinkDBJobStore, self).start(scheduler, alias) + + if self.client: + self.conn = maybe_ref(self.client) + else: + self.conn = r.connect(db=self.database, **self.connect_args) + + if self.database not in r.db_list().run(self.conn): + r.db_create(self.database).run(self.conn) + + if self.table not in r.table_list().run(self.conn): + r.table_create(self.table).run(self.conn) + + if 'next_run_time' not in r.table(self.table).index_list().run(self.conn): + r.table(self.table).index_create('next_run_time').run(self.conn) + + self.table = r.db(self.database).table(self.table) + + def lookup_job(self, job_id): + results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn)) + return self._reconstitute_job(results[0]['job_state']) if results else None + + def get_due_jobs(self, now): + return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now)) + + def get_next_run_time(self): + results = list( + self.table + .filter(r.row['next_run_time'] != None) # flake8: noqa + .order_by(r.asc('next_run_time')) + .map(lambda x: x['next_run_time']) + .limit(1) + .run(self.conn) + ) + return utc_timestamp_to_datetime(results[0]) if results else None + + def get_all_jobs(self): + jobs = self._get_jobs() + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + job_dict = { + 'id': job.id, + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + } + results = self.table.insert(job_dict).run(self.conn) + if results['errors'] > 0: + raise ConflictingIdError(job.id) + + def update_job(self, job): + changes = { + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) + } + results = self.table.get_all(job.id).update(changes).run(self.conn) + skipped = False in map(lambda x: results[x] == 0, results.keys()) + if results['skipped'] > 0 or results['errors'] > 0 or not skipped: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + results = self.table.get_all(job_id).delete().run(self.conn) + if results['deleted'] + results['skipped'] != 1: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + self.table.delete().run(self.conn) + + def shutdown(self): + self.conn.close() + + def _reconstitute_job(self, job_state): + job_state = pickle.loads(job_state) + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self, predicate=None): + jobs = [] + failed_job_ids = [] + query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if + predicate else self.table) + query = query.order_by('next_run_time', 'id').pluck('id', 'job_state') + + for document in query.run(self.conn): + try: + jobs.append(self._reconstitute_job(document['job_state'])) + except: + self._logger.exception('Unable to restore job "%s" -- removing it', document['id']) + failed_job_ids.append(document['id']) + + # Remove all the jobs we failed to restore + if failed_job_ids: + r.expr(failed_job_ids).for_each( + lambda job_id: self.table.get_all(job_id).delete()).run(self.conn) + + return jobs + + def __repr__(self): + connection = self.conn + return '<%s (connection=%s)>' % (self.__class__.__name__, connection) diff --git a/lib/apscheduler/jobstores/sqlalchemy.py b/lib/apscheduler/jobstores/sqlalchemy.py index f8a3c151..beb27fb5 100644 --- a/lib/apscheduler/jobstores/sqlalchemy.py +++ b/lib/apscheduler/jobstores/sqlalchemy.py @@ -10,29 +10,38 @@ except ImportError: # pragma: nocover import pickle try: - from sqlalchemy import create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select + from sqlalchemy import ( + create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select) from sqlalchemy.exc import IntegrityError + from sqlalchemy.sql.expression import null except ImportError: # pragma: nocover raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed') class SQLAlchemyJobStore(BaseJobStore): """ - Stores jobs in a database table using SQLAlchemy. The table will be created if it doesn't exist in the database. + Stores jobs in a database table using SQLAlchemy. + The table will be created if it doesn't exist in the database. Plugin alias: ``sqlalchemy`` - :param str url: connection string (see `SQLAlchemy documentation - `_ - on this) - :param engine: an SQLAlchemy Engine to use instead of creating a new one based on ``url`` + :param str url: connection string (see + :ref:`SQLAlchemy documentation ` on this) + :param engine: an SQLAlchemy :class:`~sqlalchemy.engine.Engine` to use instead of creating a + new one based on ``url`` :param str tablename: name of the table to store jobs in - :param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of creating a new one - :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available + :param metadata: a :class:`~sqlalchemy.schema.MetaData` instance to use instead of creating a + new one + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + :param str tableschema: name of the (existing) schema in the target database where the table + should be + :param dict engine_options: keyword arguments to :func:`~sqlalchemy.create_engine` + (ignored if ``engine`` is given) """ def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None, - pickle_protocol=pickle.HIGHEST_PROTOCOL): + pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None): super(SQLAlchemyJobStore, self).__init__() self.pickle_protocol = pickle_protocol metadata = maybe_ref(metadata) or MetaData() @@ -40,18 +49,22 @@ class SQLAlchemyJobStore(BaseJobStore): if engine: self.engine = maybe_ref(engine) elif url: - self.engine = create_engine(url) + self.engine = create_engine(url, **(engine_options or {})) else: raise ValueError('Need either "engine" or "url" defined') - # 191 = max key length in MySQL for InnoDB/utf8mb4 tables, 25 = precision that translates to an 8-byte float + # 191 = max key length in MySQL for InnoDB/utf8mb4 tables, + # 25 = precision that translates to an 8-byte float self.jobs_t = Table( tablename, metadata, Column('id', Unicode(191, _warn_on_bytestring=False), primary_key=True), Column('next_run_time', Float(25), index=True), - Column('job_state', LargeBinary, nullable=False) + Column('job_state', LargeBinary, nullable=False), + schema=tableschema ) + def start(self, scheduler, alias): + super(SQLAlchemyJobStore, self).start(scheduler, alias) self.jobs_t.create(self.engine, True) def lookup_job(self, job_id): @@ -64,13 +77,16 @@ class SQLAlchemyJobStore(BaseJobStore): return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp) def get_next_run_time(self): - selectable = select([self.jobs_t.c.next_run_time]).where(self.jobs_t.c.next_run_time != None).\ + selectable = select([self.jobs_t.c.next_run_time]).\ + where(self.jobs_t.c.next_run_time != null()).\ order_by(self.jobs_t.c.next_run_time).limit(1) next_run_time = self.engine.execute(selectable).scalar() return utc_timestamp_to_datetime(next_run_time) def get_all_jobs(self): - return self._get_jobs() + jobs = self._get_jobs() + self._fix_paused_jobs_sorting(jobs) + return jobs def add_job(self, job): insert = self.jobs_t.insert().values(**{ @@ -116,13 +132,14 @@ class SQLAlchemyJobStore(BaseJobStore): def _get_jobs(self, *conditions): jobs = [] - selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).order_by(self.jobs_t.c.next_run_time) + selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\ + order_by(self.jobs_t.c.next_run_time) selectable = selectable.where(*conditions) if conditions else selectable failed_job_ids = set() for row in self.engine.execute(selectable): try: jobs.append(self._reconstitute_job(row.job_state)) - except: + except BaseException: self._logger.exception('Unable to restore job "%s" -- removing it', row.id) failed_job_ids.add(row.id) diff --git a/lib/apscheduler/jobstores/zookeeper.py b/lib/apscheduler/jobstores/zookeeper.py new file mode 100644 index 00000000..2cca83e8 --- /dev/null +++ b/lib/apscheduler/jobstores/zookeeper.py @@ -0,0 +1,179 @@ +from __future__ import absolute_import + +import os +from datetime import datetime + +from pytz import utc +from kazoo.exceptions import NoNodeError, NodeExistsError + +from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError +from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime +from apscheduler.job import Job + +try: + import cPickle as pickle +except ImportError: # pragma: nocover + import pickle + +try: + from kazoo.client import KazooClient +except ImportError: # pragma: nocover + raise ImportError('ZooKeeperJobStore requires Kazoo installed') + + +class ZooKeeperJobStore(BaseJobStore): + """ + Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to + kazoo's `KazooClient + `_. + + Plugin alias: ``zookeeper`` + + :param str path: path to store jobs in + :param client: a :class:`~kazoo.client.KazooClient` instance to use instead of + providing connection arguments + :param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the + highest available + """ + + def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False, + pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args): + super(ZooKeeperJobStore, self).__init__() + self.pickle_protocol = pickle_protocol + self.close_connection_on_exit = close_connection_on_exit + + if not path: + raise ValueError('The "path" parameter must not be empty') + + self.path = path + + if client: + self.client = maybe_ref(client) + else: + self.client = KazooClient(**connect_args) + self._ensured_path = False + + def _ensure_paths(self): + if not self._ensured_path: + self.client.ensure_path(self.path) + self._ensured_path = True + + def start(self, scheduler, alias): + super(ZooKeeperJobStore, self).start(scheduler, alias) + if not self.client.connected: + self.client.start() + + def lookup_job(self, job_id): + self._ensure_paths() + node_path = os.path.join(self.path, job_id) + try: + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job = self._reconstitute_job(doc['job_state']) + return job + except BaseException: + return None + + def get_due_jobs(self, now): + timestamp = datetime_to_utc_timestamp(now) + jobs = [job_def['job'] for job_def in self._get_jobs() + if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp] + return jobs + + def get_next_run_time(self): + next_runs = [job_def['next_run_time'] for job_def in self._get_jobs() + if job_def['next_run_time'] is not None] + return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None + + def get_all_jobs(self): + jobs = [job_def['job'] for job_def in self._get_jobs()] + self._fix_paused_jobs_sorting(jobs) + return jobs + + def add_job(self, job): + self._ensure_paths() + node_path = os.path.join(self.path, str(job.id)) + value = { + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': job.__getstate__() + } + data = pickle.dumps(value, self.pickle_protocol) + try: + self.client.create(node_path, value=data) + except NodeExistsError: + raise ConflictingIdError(job.id) + + def update_job(self, job): + self._ensure_paths() + node_path = os.path.join(self.path, str(job.id)) + changes = { + 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), + 'job_state': job.__getstate__() + } + data = pickle.dumps(changes, self.pickle_protocol) + try: + self.client.set(node_path, value=data) + except NoNodeError: + raise JobLookupError(job.id) + + def remove_job(self, job_id): + self._ensure_paths() + node_path = os.path.join(self.path, str(job_id)) + try: + self.client.delete(node_path) + except NoNodeError: + raise JobLookupError(job_id) + + def remove_all_jobs(self): + try: + self.client.delete(self.path, recursive=True) + except NoNodeError: + pass + self._ensured_path = False + + def shutdown(self): + if self.close_connection_on_exit: + self.client.stop() + self.client.close() + + def _reconstitute_job(self, job_state): + job_state = job_state + job = Job.__new__(Job) + job.__setstate__(job_state) + job._scheduler = self._scheduler + job._jobstore_alias = self._alias + return job + + def _get_jobs(self): + self._ensure_paths() + jobs = [] + failed_job_ids = [] + all_ids = self.client.get_children(self.path) + for node_name in all_ids: + try: + node_path = os.path.join(self.path, node_name) + content, _ = self.client.get(node_path) + doc = pickle.loads(content) + job_def = { + 'job_id': node_name, + 'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None, + 'job_state': doc['job_state'], + 'job': self._reconstitute_job(doc['job_state']), + 'creation_time': _.ctime + } + jobs.append(job_def) + except BaseException: + self._logger.exception('Unable to restore job "%s" -- removing it' % node_name) + failed_job_ids.append(node_name) + + # Remove all the jobs we failed to restore + if failed_job_ids: + for failed_id in failed_job_ids: + self.remove_job(failed_id) + paused_sort_key = datetime(9999, 12, 31, tzinfo=utc) + return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key, + job_def['creation_time'])) + + def __repr__(self): + self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client)) + return '<%s (client=%s)>' % (self.__class__.__name__, self.client) diff --git a/lib/apscheduler/schedulers/asyncio.py b/lib/apscheduler/schedulers/asyncio.py index b91ee97a..289ef13f 100644 --- a/lib/apscheduler/schedulers/asyncio.py +++ b/lib/apscheduler/schedulers/asyncio.py @@ -1,5 +1,5 @@ from __future__ import absolute_import -from functools import wraps +from functools import wraps, partial from apscheduler.schedulers.base import BaseScheduler from apscheduler.util import maybe_ref @@ -10,13 +10,15 @@ except ImportError: # pragma: nocover try: import trollius as asyncio except ImportError: - raise ImportError('AsyncIOScheduler requires either Python 3.4 or the asyncio package installed') + raise ImportError( + 'AsyncIOScheduler requires either Python 3.4 or the asyncio package installed') def run_in_event_loop(func): @wraps(func) def wrapper(self, *args, **kwargs): - self._eventloop.call_soon_threadsafe(func, self, *args, **kwargs) + wrapped = partial(func, self, *args, **kwargs) + self._eventloop.call_soon_threadsafe(wrapped) return wrapper @@ -24,6 +26,8 @@ class AsyncIOScheduler(BaseScheduler): """ A scheduler that runs on an asyncio (:pep:`3156`) event loop. + The default executor can run jobs based on native coroutines (``async def``). + Extra options: ============== ============================================================= @@ -34,10 +38,6 @@ class AsyncIOScheduler(BaseScheduler): _eventloop = None _timeout = None - def start(self): - super(AsyncIOScheduler, self).start() - self.wakeup() - @run_in_event_loop def shutdown(self, wait=True): super(AsyncIOScheduler, self).shutdown(wait) diff --git a/lib/apscheduler/schedulers/background.py b/lib/apscheduler/schedulers/background.py index 86ff2ba3..03f29822 100644 --- a/lib/apscheduler/schedulers/background.py +++ b/lib/apscheduler/schedulers/background.py @@ -1,4 +1,5 @@ from __future__ import absolute_import + from threading import Thread, Event from apscheduler.schedulers.base import BaseScheduler @@ -13,11 +14,12 @@ class BackgroundScheduler(BlockingScheduler): Extra options: - ========== ============================================================================================ - ``daemon`` Set the ``daemon`` option in the background thread (defaults to ``True``, - see `the documentation `_ + ========== ============================================================================= + ``daemon`` Set the ``daemon`` option in the background thread (defaults to ``True``, see + `the documentation + `_ for further details) - ========== ============================================================================================ + ========== ============================================================================= """ _thread = None @@ -26,14 +28,14 @@ class BackgroundScheduler(BlockingScheduler): self._daemon = asbool(config.pop('daemon', True)) super(BackgroundScheduler, self)._configure(config) - def start(self): - BaseScheduler.start(self) + def start(self, *args, **kwargs): self._event = Event() + BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.daemon = self._daemon self._thread.start() - def shutdown(self, wait=True): - super(BackgroundScheduler, self).shutdown(wait) + def shutdown(self, *args, **kwargs): + super(BackgroundScheduler, self).shutdown(*args, **kwargs) self._thread.join() del self._thread diff --git a/lib/apscheduler/schedulers/base.py b/lib/apscheduler/schedulers/base.py index cdc0bf06..ab705c32 100644 --- a/lib/apscheduler/schedulers/base.py +++ b/lib/apscheduler/schedulers/base.py @@ -1,9 +1,11 @@ from __future__ import print_function + from abc import ABCMeta, abstractmethod from collections import MutableMapping from threading import RLock -from datetime import datetime +from datetime import datetime, timedelta from logging import getLogger +import warnings import sys from pkg_resources import iter_entry_points @@ -19,20 +21,39 @@ from apscheduler.job import Job from apscheduler.triggers.base import BaseTrigger from apscheduler.util import asbool, asint, astimezone, maybe_ref, timedelta_seconds, undefined from apscheduler.events import ( - SchedulerEvent, JobEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, EVENT_JOBSTORE_ADDED, - EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, - EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED) + SchedulerEvent, JobEvent, JobSubmissionEvent, EVENT_SCHEDULER_START, EVENT_SCHEDULER_SHUTDOWN, + EVENT_JOBSTORE_ADDED, EVENT_JOBSTORE_REMOVED, EVENT_ALL, EVENT_JOB_MODIFIED, EVENT_JOB_REMOVED, + EVENT_JOB_ADDED, EVENT_EXECUTOR_ADDED, EVENT_EXECUTOR_REMOVED, EVENT_ALL_JOBS_REMOVED, + EVENT_JOB_SUBMITTED, EVENT_JOB_MAX_INSTANCES, EVENT_SCHEDULER_RESUMED, EVENT_SCHEDULER_PAUSED) + +#: constant indicating a scheduler's stopped state +STATE_STOPPED = 0 +#: constant indicating a scheduler's running state (started and processing jobs) +STATE_RUNNING = 1 +#: constant indicating a scheduler's paused state (started but not processing jobs) +STATE_PAUSED = 2 class BaseScheduler(six.with_metaclass(ABCMeta)): """ - Abstract base class for all schedulers. Takes the following keyword arguments: + Abstract base class for all schedulers. - :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to apscheduler.scheduler) + Takes the following keyword arguments: + + :param str|logging.Logger logger: logger to use for the scheduler's logging (defaults to + apscheduler.scheduler) :param str|datetime.tzinfo timezone: the default time zone (defaults to the local timezone) + :param int|float jobstore_retry_interval: the minimum number of seconds to wait between + retries in the scheduler's main loop if the job store raises an exception when getting + the list of due jobs :param dict job_defaults: default values for newly added jobs - :param dict jobstores: a dictionary of job store alias -> job store instance or configuration dict - :param dict executors: a dictionary of executor alias -> executor instance or configuration dict + :param dict jobstores: a dictionary of job store alias -> job store instance or configuration + dict + :param dict executors: a dictionary of executor alias -> executor instance or configuration + dict + + :ivar int state: current running state of the scheduler (one of the following constants from + ``apscheduler.schedulers.base``: ``STATE_STOPPED``, ``STATE_RUNNING``, ``STATE_PAUSED``) .. seealso:: :ref:`scheduler-config` """ @@ -43,7 +64,6 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): _executor_classes = {} _jobstore_plugins = dict((ep.name, ep) for ep in iter_entry_points('apscheduler.jobstores')) _jobstore_classes = {} - _stopped = True # # Public API @@ -58,28 +78,34 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self._listeners = [] self._listeners_lock = self._create_lock() self._pending_jobs = [] + self.state = STATE_STOPPED self.configure(gconfig, **options) def configure(self, gconfig={}, prefix='apscheduler.', **options): """ - Reconfigures the scheduler with the given options. Can only be done when the scheduler isn't running. + Reconfigures the scheduler with the given options. - :param dict gconfig: a "global" configuration dictionary whose values can be overridden by keyword arguments to - this method - :param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with this string - (pass an empty string or ``None`` to use all keys) + Can only be done when the scheduler isn't running. + + :param dict gconfig: a "global" configuration dictionary whose values can be overridden by + keyword arguments to this method + :param str|unicode prefix: pick only those keys from ``gconfig`` that are prefixed with + this string (pass an empty string or ``None`` to use all keys) :raises SchedulerAlreadyRunningError: if the scheduler is already running - """ - if self.running: + """ + if self.state != STATE_STOPPED: raise SchedulerAlreadyRunningError - # If a non-empty prefix was given, strip it from the keys in the global configuration dict + # If a non-empty prefix was given, strip it from the keys in the + # global configuration dict if prefix: prefixlen = len(prefix) - gconfig = dict((key[prefixlen:], value) for key, value in six.iteritems(gconfig) if key.startswith(prefix)) + gconfig = dict((key[prefixlen:], value) for key, value in six.iteritems(gconfig) + if key.startswith(prefix)) - # Create a structure from the dotted options (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}}) + # Create a structure from the dotted options + # (e.g. "a.b.c = d" -> {'a': {'b': {'c': 'd'}}}) config = {} for key, value in six.iteritems(gconfig): parts = key.split('.') @@ -94,15 +120,15 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): config.update(options) self._configure(config) - @abstractmethod - def start(self): + def start(self, paused=False): """ - Starts the scheduler. The details of this process depend on the implementation. + Start the configured executors and job stores and begin processing scheduled jobs. + :param bool paused: if ``True``, don't start job processing until :meth:`resume` is called :raises SchedulerAlreadyRunningError: if the scheduler is already running - """ - if self.running: + """ + if self.state != STATE_STOPPED: raise SchedulerAlreadyRunningError with self._executors_lock: @@ -125,70 +151,109 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): # Schedule all pending jobs for job, jobstore_alias, replace_existing in self._pending_jobs: - self._real_add_job(job, jobstore_alias, replace_existing, False) + self._real_add_job(job, jobstore_alias, replace_existing) del self._pending_jobs[:] - self._stopped = False + self.state = STATE_PAUSED if paused else STATE_RUNNING self._logger.info('Scheduler started') - - # Notify listeners that the scheduler has been started self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_START)) + if not paused: + self.wakeup() + @abstractmethod def shutdown(self, wait=True): """ - Shuts down the scheduler. Does not interrupt any currently running jobs. + Shuts down the scheduler, along with its executors and job stores. + + Does not interrupt any currently running jobs. :param bool wait: ``True`` to wait until all currently executing jobs have finished :raises SchedulerNotRunningError: if the scheduler has not been started yet - """ - if not self.running: + """ + if self.state == STATE_STOPPED: raise SchedulerNotRunningError - self._stopped = True + self.state = STATE_STOPPED - # Shut down all executors - for executor in six.itervalues(self._executors): - executor.shutdown(wait) + with self._jobstores_lock, self._executors_lock: + # Shut down all executors + for executor in six.itervalues(self._executors): + executor.shutdown(wait) - # Shut down all job stores - for jobstore in six.itervalues(self._jobstores): - jobstore.shutdown() + # Shut down all job stores + for jobstore in six.itervalues(self._jobstores): + jobstore.shutdown() self._logger.info('Scheduler has been shut down') self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_SHUTDOWN)) + def pause(self): + """ + Pause job processing in the scheduler. + + This will prevent the scheduler from waking up to do job processing until :meth:`resume` + is called. It will not however stop any already running job processing. + + """ + if self.state == STATE_STOPPED: + raise SchedulerNotRunningError + elif self.state == STATE_RUNNING: + self.state = STATE_PAUSED + self._logger.info('Paused scheduler job processing') + self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_PAUSED)) + + def resume(self): + """Resume job processing in the scheduler.""" + if self.state == STATE_STOPPED: + raise SchedulerNotRunningError + elif self.state == STATE_PAUSED: + self.state = STATE_RUNNING + self._logger.info('Resumed scheduler job processing') + self._dispatch_event(SchedulerEvent(EVENT_SCHEDULER_RESUMED)) + self.wakeup() + @property def running(self): - return not self._stopped + """ + Return ``True`` if the scheduler has been started. + + This is a shortcut for ``scheduler.state != STATE_STOPPED``. + + """ + return self.state != STATE_STOPPED def add_executor(self, executor, alias='default', **executor_opts): """ - Adds an executor to this scheduler. Any extra keyword arguments will be passed to the executor plugin's - constructor, assuming that the first argument is the name of an executor plugin. + Adds an executor to this scheduler. - :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor instance or the name of - an executor plugin + Any extra keyword arguments will be passed to the executor plugin's constructor, assuming + that the first argument is the name of an executor plugin. + + :param str|unicode|apscheduler.executors.base.BaseExecutor executor: either an executor + instance or the name of an executor plugin :param str|unicode alias: alias for the scheduler :raises ValueError: if there is already an executor by the given alias - """ + """ with self._executors_lock: if alias in self._executors: - raise ValueError('This scheduler already has an executor by the alias of "%s"' % alias) + raise ValueError('This scheduler already has an executor by the alias of "%s"' % + alias) if isinstance(executor, BaseExecutor): self._executors[alias] = executor elif isinstance(executor, six.string_types): - self._executors[alias] = executor = self._create_plugin_instance('executor', executor, executor_opts) + self._executors[alias] = executor = self._create_plugin_instance( + 'executor', executor, executor_opts) else: raise TypeError('Expected an executor instance or a string, got %s instead' % executor.__class__.__name__) # Start the executor right away if the scheduler is running - if self.running: - executor.start(self) + if self.state != STATE_STOPPED: + executor.start(self, alias) self._dispatch_event(SchedulerEvent(EVENT_EXECUTOR_ADDED, alias)) @@ -197,10 +262,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): Removes the executor by the given alias from this scheduler. :param str|unicode alias: alias of the executor - :param bool shutdown: ``True`` to shut down the executor after removing it - """ + :param bool shutdown: ``True`` to shut down the executor after + removing it - with self._jobstores_lock: + """ + with self._executors_lock: executor = self._lookup_executor(alias) del self._executors[alias] @@ -211,35 +277,39 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def add_jobstore(self, jobstore, alias='default', **jobstore_opts): """ - Adds a job store to this scheduler. Any extra keyword arguments will be passed to the job store plugin's - constructor, assuming that the first argument is the name of a job store plugin. + Adds a job store to this scheduler. + + Any extra keyword arguments will be passed to the job store plugin's constructor, assuming + that the first argument is the name of a job store plugin. :param str|unicode|apscheduler.jobstores.base.BaseJobStore jobstore: job store to be added :param str|unicode alias: alias for the job store :raises ValueError: if there is already a job store by the given alias - """ + """ with self._jobstores_lock: if alias in self._jobstores: - raise ValueError('This scheduler already has a job store by the alias of "%s"' % alias) + raise ValueError('This scheduler already has a job store by the alias of "%s"' % + alias) if isinstance(jobstore, BaseJobStore): self._jobstores[alias] = jobstore elif isinstance(jobstore, six.string_types): - self._jobstores[alias] = jobstore = self._create_plugin_instance('jobstore', jobstore, jobstore_opts) + self._jobstores[alias] = jobstore = self._create_plugin_instance( + 'jobstore', jobstore, jobstore_opts) else: raise TypeError('Expected a job store instance or a string, got %s instead' % jobstore.__class__.__name__) - # Start the job store right away if the scheduler is running - if self.running: + # Start the job store right away if the scheduler isn't stopped + if self.state != STATE_STOPPED: jobstore.start(self, alias) # Notify listeners that a new job store has been added self._dispatch_event(SchedulerEvent(EVENT_JOBSTORE_ADDED, alias)) # Notify the scheduler so it can scan the new job store for jobs - if self.running: + if self.state != STATE_STOPPED: self.wakeup() def remove_jobstore(self, alias, shutdown=True): @@ -248,8 +318,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode alias: alias of the job store :param bool shutdown: ``True`` to shut down the job store after removing it - """ + """ with self._jobstores_lock: jobstore = self._lookup_jobstore(alias) del self._jobstores[alias] @@ -263,17 +333,20 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): """ add_listener(callback, mask=EVENT_ALL) - Adds a listener for scheduler events. When a matching event occurs, ``callback`` is executed with the event - object as its sole argument. If the ``mask`` parameter is not provided, the callback will receive events of all - types. + Adds a listener for scheduler events. + + When a matching event occurs, ``callback`` is executed with the event object as its + sole argument. If the ``mask`` parameter is not provided, the callback will receive events + of all types. :param callback: any callable that takes one argument - :param int mask: bitmask that indicates which events should be listened to + :param int mask: bitmask that indicates which events should be + listened to .. seealso:: :mod:`apscheduler.events` .. seealso:: :ref:`scheduler-events` - """ + """ with self._listeners_lock: self._listeners.append((callback, mask)) @@ -285,47 +358,55 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if callback == cb: del self._listeners[i] - def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, - coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', - executor='default', replace_existing=False, **trigger_args): + def add_job(self, func, trigger=None, args=None, kwargs=None, id=None, name=None, + misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, + next_run_time=undefined, jobstore='default', executor='default', + replace_existing=False, **trigger_args): """ - add_job(func, trigger=None, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, \ - coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', \ - executor='default', replace_existing=False, **trigger_args) + add_job(func, trigger=None, args=None, kwargs=None, id=None, \ + name=None, misfire_grace_time=undefined, coalesce=undefined, \ + max_instances=undefined, next_run_time=undefined, \ + jobstore='default', executor='default', \ + replace_existing=False, **trigger_args) Adds the given job to the job list and wakes up the scheduler if it's already running. - Any option that defaults to ``undefined`` will be replaced with the corresponding default value when the job is - scheduled (which happens when the scheduler is started, or immediately if the scheduler is already running). + Any option that defaults to ``undefined`` will be replaced with the corresponding default + value when the job is scheduled (which happens when the scheduler is started, or + immediately if the scheduler is already running). - The ``func`` argument can be given either as a callable object or a textual reference in the - ``package.module:some.object`` format, where the first half (separated by ``:``) is an importable module and the - second half is a reference to the callable object, relative to the module. + The ``func`` argument can be given either as a callable object or a textual reference in + the ``package.module:some.object`` format, where the first half (separated by ``:``) is an + importable module and the second half is a reference to the callable object, relative to + the module. The ``trigger`` argument can either be: - #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case any extra keyword - arguments to this method are passed on to the trigger's constructor + #. the alias name of the trigger (e.g. ``date``, ``interval`` or ``cron``), in which case + any extra keyword arguments to this method are passed on to the trigger's constructor #. an instance of a trigger class :param func: callable (or a textual reference to one) to run at the given time - :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when ``func`` is called + :param str|apscheduler.triggers.base.BaseTrigger trigger: trigger that determines when + ``func`` is called :param list|tuple args: list of positional arguments to call func with :param dict kwargs: dict of keyword arguments to call func with :param str|unicode id: explicit identifier for the job (for modifying it later) :param str|unicode name: textual description of the job - :param int misfire_grace_time: seconds after the designated run time that the job is still allowed to be run - :param bool coalesce: run once instead of many times if the scheduler determines that the job should be run more - than once in succession - :param int max_instances: maximum number of concurrently running instances allowed for this job - :param datetime next_run_time: when to first run the job, regardless of the trigger (pass ``None`` to add the - job as paused) + :param int misfire_grace_time: seconds after the designated runtime that the job is still + allowed to be run + :param bool coalesce: run once instead of many times if the scheduler determines that the + job should be run more than once in succession + :param int max_instances: maximum number of concurrently running instances allowed for this + job + :param datetime next_run_time: when to first run the job, regardless of the trigger (pass + ``None`` to add the job as paused) :param str|unicode jobstore: alias of the job store to store the job in :param str|unicode executor: alias of the executor to run the job with - :param bool replace_existing: ``True`` to replace an existing job with the same ``id`` (but retain the - number of runs from the existing one) + :param bool replace_existing: ``True`` to replace an existing job with the same ``id`` + (but retain the number of runs from the existing one) :rtype: Job - """ + """ job_kwargs = { 'trigger': self._create_trigger(trigger, trigger_args), 'executor': executor, @@ -339,45 +420,55 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): 'max_instances': max_instances, 'next_run_time': next_run_time } - job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if value is not undefined) + job_kwargs = dict((key, value) for key, value in six.iteritems(job_kwargs) if + value is not undefined) job = Job(self, **job_kwargs) # Don't really add jobs to job stores before the scheduler is up and running with self._jobstores_lock: - if not self.running: + if self.state == STATE_STOPPED: self._pending_jobs.append((job, jobstore, replace_existing)) - self._logger.info('Adding job tentatively -- it will be properly scheduled when the scheduler starts') + self._logger.info('Adding job tentatively -- it will be properly scheduled when ' + 'the scheduler starts') else: - self._real_add_job(job, jobstore, replace_existing, True) + self._real_add_job(job, jobstore, replace_existing) return job - def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, - coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', - executor='default', **trigger_args): + def scheduled_job(self, trigger, args=None, kwargs=None, id=None, name=None, + misfire_grace_time=undefined, coalesce=undefined, max_instances=undefined, + next_run_time=undefined, jobstore='default', executor='default', + **trigger_args): """ - scheduled_job(trigger, args=None, kwargs=None, id=None, name=None, misfire_grace_time=undefined, \ - coalesce=undefined, max_instances=undefined, next_run_time=undefined, jobstore='default', \ + scheduled_job(trigger, args=None, kwargs=None, id=None, \ + name=None, misfire_grace_time=undefined, \ + coalesce=undefined, max_instances=undefined, \ + next_run_time=undefined, jobstore='default', \ executor='default',**trigger_args) - A decorator version of :meth:`add_job`, except that ``replace_existing`` is always ``True``. + A decorator version of :meth:`add_job`, except that ``replace_existing`` is always + ``True``. + + .. important:: The ``id`` argument must be given if scheduling a job in a persistent job + store. The scheduler cannot, however, enforce this requirement. - .. important:: The ``id`` argument must be given if scheduling a job in a persistent job store. The scheduler - cannot, however, enforce this requirement. """ - def inner(func): - self.add_job(func, trigger, args, kwargs, id, name, misfire_grace_time, coalesce, max_instances, - next_run_time, jobstore, executor, True, **trigger_args) + self.add_job(func, trigger, args, kwargs, id, name, misfire_grace_time, coalesce, + max_instances, next_run_time, jobstore, executor, True, **trigger_args) return func return inner def modify_job(self, job_id, jobstore=None, **changes): """ - Modifies the properties of a single job. Modifications are passed to this method as extra keyword arguments. + Modifies the properties of a single job. + + Modifications are passed to this method as extra keyword arguments. :param str|unicode job_id: the identifier of the job :param str|unicode jobstore: alias of the job store that contains the job + :return Job: the relevant job instance + """ with self._jobstores_lock: job, jobstore = self._lookup_job(job_id, jobstore) @@ -388,22 +479,27 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self._dispatch_event(JobEvent(EVENT_JOB_MODIFIED, job_id, jobstore)) # Wake up the scheduler since the job's next run time may have been changed - self.wakeup() + if self.state == STATE_RUNNING: + self.wakeup() + + return job def reschedule_job(self, job_id, jobstore=None, trigger=None, **trigger_args): """ Constructs a new trigger for a job and updates its next run time. + Extra keyword arguments are passed directly to the trigger's constructor. :param str|unicode job_id: the identifier of the job :param str|unicode jobstore: alias of the job store that contains the job :param trigger: alias of the trigger type or a trigger instance - """ + :return Job: the relevant job instance + """ trigger = self._create_trigger(trigger, trigger_args) now = datetime.now(self.timezone) next_run_time = trigger.get_next_fire_time(None, now) - self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time) + return self.modify_job(job_id, jobstore, trigger=trigger, next_run_time=next_run_time) def pause_job(self, job_id, jobstore=None): """ @@ -411,9 +507,10 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode job_id: the identifier of the job :param str|unicode jobstore: alias of the job store that contains the job - """ + :return Job: the relevant job instance - self.modify_job(job_id, jobstore, next_run_time=None) + """ + return self.modify_job(job_id, jobstore, next_run_time=None) def resume_job(self, job_id, jobstore=None): """ @@ -421,38 +518,44 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode job_id: the identifier of the job :param str|unicode jobstore: alias of the job store that contains the job - """ + :return Job|None: the relevant job instance if the job was rescheduled, or ``None`` if no + next run time could be calculated and the job was removed + """ with self._jobstores_lock: job, jobstore = self._lookup_job(job_id, jobstore) now = datetime.now(self.timezone) next_run_time = job.trigger.get_next_fire_time(None, now) if next_run_time: - self.modify_job(job_id, jobstore, next_run_time=next_run_time) + return self.modify_job(job_id, jobstore, next_run_time=next_run_time) else: self.remove_job(job.id, jobstore) def get_jobs(self, jobstore=None, pending=None): """ - Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled jobs, either from a - specific job store or from all of them. + Returns a list of pending jobs (if the scheduler hasn't been started yet) and scheduled + jobs, either from a specific job store or from all of them. + + If the scheduler has not been started yet, only pending jobs can be returned because the + job stores haven't been started yet either. :param str|unicode jobstore: alias of the job store - :param bool pending: ``False`` to leave out pending jobs (jobs that are waiting for the scheduler start to be - added to their respective job stores), ``True`` to only include pending jobs, anything else - to return both + :param bool pending: **DEPRECATED** :rtype: list[Job] + """ + if pending is not None: + warnings.warn('The "pending" option is deprecated -- get_jobs() always returns ' + 'pending jobs if the scheduler has been started and scheduled jobs ' + 'otherwise', DeprecationWarning) with self._jobstores_lock: jobs = [] - - if pending is not False: + if self.state == STATE_STOPPED: for job, alias, replace_existing in self._pending_jobs: if jobstore is None or alias == jobstore: jobs.append(job) - - if pending is not True: + else: for alias, store in six.iteritems(self._jobstores): if jobstore is None or alias == jobstore: jobs.extend(store.get_all_jobs()) @@ -467,8 +570,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode jobstore: alias of the job store that most likely contains the job :return: the Job by the given ID, or ``None`` if it wasn't found :rtype: Job - """ + """ with self._jobstores_lock: try: return self._lookup_job(job_id, jobstore)[0] @@ -482,32 +585,35 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode job_id: the identifier of the job :param str|unicode jobstore: alias of the job store that contains the job :raises JobLookupError: if the job was not found - """ + """ + jobstore_alias = None with self._jobstores_lock: - # Check if the job is among the pending jobs - for i, (job, jobstore_alias, replace_existing) in enumerate(self._pending_jobs): - if job.id == job_id: - del self._pending_jobs[i] - jobstore = jobstore_alias - break + if self.state == STATE_STOPPED: + # Check if the job is among the pending jobs + if self.state == STATE_STOPPED: + for i, (job, alias, replace_existing) in enumerate(self._pending_jobs): + if job.id == job_id and jobstore in (None, alias): + del self._pending_jobs[i] + jobstore_alias = alias + break else: - # Otherwise, try to remove it from each store until it succeeds or we run out of stores to check + # Otherwise, try to remove it from each store until it succeeds or we run out of + # stores to check for alias, store in six.iteritems(self._jobstores): if jobstore in (None, alias): try: store.remove_job(job_id) + jobstore_alias = alias + break except JobLookupError: continue - jobstore = alias - break - - if jobstore is None: + if jobstore_alias is None: raise JobLookupError(job_id) # Notify listeners that a job has been removed - event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore) + event = JobEvent(EVENT_JOB_REMOVED, job_id, jobstore_alias) self._dispatch_event(event) self._logger.info('Removed job %s', job_id) @@ -517,17 +623,19 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): Removes all jobs from the specified job store, or all job stores if none is given. :param str|unicode jobstore: alias of the job store + """ - with self._jobstores_lock: - if jobstore: - self._pending_jobs = [pending for pending in self._pending_jobs if pending[1] != jobstore] + if self.state == STATE_STOPPED: + if jobstore: + self._pending_jobs = [pending for pending in self._pending_jobs if + pending[1] != jobstore] + else: + self._pending_jobs = [] else: - self._pending_jobs = [] - - for alias, store in six.iteritems(self._jobstores): - if jobstore in (None, alias): - store.remove_all_jobs() + for alias, store in six.iteritems(self._jobstores): + if jobstore in (None, alias): + store.remove_all_jobs() self._dispatch_event(SchedulerEvent(EVENT_ALL_JOBS_REMOVED, jobstore)) @@ -535,29 +643,34 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): """ print_jobs(jobstore=None, out=sys.stdout) - Prints out a textual listing of all jobs currently scheduled on either all job stores or just a specific one. + Prints out a textual listing of all jobs currently scheduled on either all job stores or + just a specific one. :param str|unicode jobstore: alias of the job store, ``None`` to list jobs from all stores - :param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is given) - """ + :param file out: a file-like object to print to (defaults to **sys.stdout** if nothing is + given) + """ out = out or sys.stdout with self._jobstores_lock: - if self._pending_jobs: - print(six.u('Pending jobs:'), file=out) - for job, jobstore_alias, replace_existing in self._pending_jobs: - if jobstore in (None, jobstore_alias): - print(six.u(' %s') % job, file=out) - - for alias, store in six.iteritems(self._jobstores): - if jobstore in (None, alias): - print(six.u('Jobstore %s:') % alias, file=out) - jobs = store.get_all_jobs() - if jobs: - for job in jobs: - print(six.u(' %s') % job, file=out) - else: - print(six.u(' No scheduled jobs'), file=out) + if self.state == STATE_STOPPED: + print(u'Pending jobs:', file=out) + if self._pending_jobs: + for job, jobstore_alias, replace_existing in self._pending_jobs: + if jobstore in (None, jobstore_alias): + print(u' %s' % job, file=out) + else: + print(u' No pending jobs', file=out) + else: + for alias, store in sorted(six.iteritems(self._jobstores)): + if jobstore in (None, alias): + print(u'Jobstore %s:' % alias, file=out) + jobs = store.get_all_jobs() + if jobs: + for job in jobs: + print(u' %s' % job, file=out) + else: + print(u' No scheduled jobs', file=out) @abstractmethod def wakeup(self): @@ -574,6 +687,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): # Set general options self._logger = maybe_ref(config.pop('logger', None)) or getLogger('apscheduler.scheduler') self.timezone = astimezone(config.pop('timezone', None)) or get_localzone() + self.jobstore_retry_interval = float(config.pop('jobstore_retry_interval', 10)) # Set the job defaults job_defaults = config.get('job_defaults', {}) @@ -597,12 +711,15 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): cls = maybe_ref(executor_class) executor = cls(**value) else: - raise ValueError('Cannot create executor "%s" -- either "type" or "class" must be defined' % alias) + raise ValueError( + 'Cannot create executor "%s" -- either "type" or "class" must be defined' % + alias) self.add_executor(executor, alias) else: - raise TypeError("Expected executor instance or dict for executors['%s'], got %s instead" % ( - alias, value.__class__.__name__)) + raise TypeError( + "Expected executor instance or dict for executors['%s'], got %s instead" % + (alias, value.__class__.__name__)) # Configure job stores self._jobstores.clear() @@ -618,31 +735,33 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): cls = maybe_ref(jobstore_class) jobstore = cls(**value) else: - raise ValueError('Cannot create job store "%s" -- either "type" or "class" must be defined' % alias) + raise ValueError( + 'Cannot create job store "%s" -- either "type" or "class" must be ' + 'defined' % alias) self.add_jobstore(jobstore, alias) else: - raise TypeError("Expected job store instance or dict for jobstores['%s'], got %s instead" % ( - alias, value.__class__.__name__)) + raise TypeError( + "Expected job store instance or dict for jobstores['%s'], got %s instead" % + (alias, value.__class__.__name__)) def _create_default_executor(self): """Creates a default executor store, specific to the particular scheduler type.""" - return ThreadPoolExecutor() def _create_default_jobstore(self): """Creates a default job store, specific to the particular scheduler type.""" - return MemoryJobStore() def _lookup_executor(self, alias): """ - Returns the executor instance by the given name from the list of executors that were added to this scheduler. + Returns the executor instance by the given name from the list of executors that were added + to this scheduler. :type alias: str :raises KeyError: if no executor by the given alias is not found - """ + """ try: return self._executors[alias] except KeyError: @@ -650,12 +769,13 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def _lookup_jobstore(self, alias): """ - Returns the job store instance by the given name from the list of job stores that were added to this scheduler. + Returns the job store instance by the given name from the list of job stores that were + added to this scheduler. :type alias: str :raises KeyError: if no job store by the given alias is not found - """ + """ try: return self._jobstores[alias] except KeyError: @@ -667,21 +787,23 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :type job_id: str :param str jobstore_alias: alias of a job store to look in - :return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of a pending job) + :return tuple[Job, str]: a tuple of job, jobstore alias (jobstore alias is None in case of + a pending job) :raises JobLookupError: if no job by the given ID is found. + """ - - # Check if the job is among the pending jobs - for job, alias, replace_existing in self._pending_jobs: - if job.id == job_id: - return job, None - - # Look in all job stores - for alias, store in six.iteritems(self._jobstores): - if jobstore_alias in (None, alias): - job = store.lookup_job(job_id) - if job is not None: - return job, alias + if self.state == STATE_STOPPED: + # Check if the job is among the pending jobs + for job, alias, replace_existing in self._pending_jobs: + if job.id == job_id: + return job, None + else: + # Look in all job stores + for alias, store in six.iteritems(self._jobstores): + if jobstore_alias in (None, alias): + job = store.lookup_job(job_id) + if job is not None: + return job, alias raise JobLookupError(job_id) @@ -690,8 +812,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): Dispatches the given event to interested listeners. :param SchedulerEvent event: the event to send - """ + """ with self._listeners_lock: listeners = tuple(self._listeners) @@ -699,16 +821,16 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if event.code & mask: try: cb(event) - except: + except BaseException: self._logger.exception('Error notifying listener') - def _real_add_job(self, job, jobstore_alias, replace_existing, wakeup): + def _real_add_job(self, job, jobstore_alias, replace_existing): """ :param Job job: the job to add - :param bool replace_existing: ``True`` to use update_job() in case the job already exists in the store - :param bool wakeup: ``True`` to wake up the scheduler after adding the job - """ + :param bool replace_existing: ``True`` to use update_job() in case the job already exists + in the store + """ # Fill in undefined values with defaults replacements = {} for key, value in six.iteritems(self._job_defaults): @@ -743,12 +865,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self._logger.info('Added job "%s" to job store "%s"', job.name, jobstore_alias) # Notify the scheduler about the new job - if wakeup: + if self.state == STATE_RUNNING: self.wakeup() def _create_plugin_instance(self, type_, alias, constructor_kwargs): """Creates an instance of the given plugin type, loading the plugin first if necessary.""" - plugin_container, class_container, base_class = { 'trigger': (self._trigger_plugins, self._trigger_classes, BaseTrigger), 'jobstore': (self._jobstore_plugins, self._jobstore_classes, BaseJobStore), @@ -761,7 +882,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): if alias in plugin_container: plugin_cls = class_container[alias] = plugin_container[alias].load() if not issubclass(plugin_cls, base_class): - raise TypeError('The {0} entry point does not point to a {0} class'.format(type_)) + raise TypeError('The {0} entry point does not point to a {0} class'. + format(type_)) else: raise LookupError('No {0} by the name "{1}" was found'.format(type_, alias)) @@ -773,7 +895,8 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): elif trigger is None: trigger = 'date' elif not isinstance(trigger, six.string_types): - raise TypeError('Expected a trigger instance or string, got %s instead' % trigger.__class__.__name__) + raise TypeError('Expected a trigger instance or string, got %s instead' % + trigger.__class__.__name__) # Use the scheduler's time zone if nothing else is specified trigger_args.setdefault('timezone', self.timezone) @@ -783,29 +906,48 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): def _create_lock(self): """Creates a reentrant lock object.""" - return RLock() def _process_jobs(self): """ - Iterates through jobs in every jobstore, starts jobs that are due and figures out how long to wait for the next - round. + Iterates through jobs in every jobstore, starts jobs that are due and figures out how long + to wait for the next round. + + If the ``get_due_jobs()`` call raises an exception, a new wakeup is scheduled in at least + ``jobstore_retry_interval`` seconds. + """ + if self.state == STATE_PAUSED: + self._logger.debug('Scheduler is paused -- not processing jobs') + return None self._logger.debug('Looking for jobs to run') now = datetime.now(self.timezone) next_wakeup_time = None + events = [] with self._jobstores_lock: for jobstore_alias, jobstore in six.iteritems(self._jobstores): - for job in jobstore.get_due_jobs(now): + try: + due_jobs = jobstore.get_due_jobs(now) + except Exception as e: + # Schedule a wakeup at least in jobstore_retry_interval seconds + self._logger.warning('Error getting due jobs from job store %r: %s', + jobstore_alias, e) + retry_wakeup_time = now + timedelta(seconds=self.jobstore_retry_interval) + if not next_wakeup_time or next_wakeup_time > retry_wakeup_time: + next_wakeup_time = retry_wakeup_time + + continue + + for job in due_jobs: # Look up the job's executor try: executor = self._lookup_executor(job.executor) - except: + except BaseException: self._logger.error( - 'Executor lookup ("%s") failed for job "%s" -- removing it from the job store', - job.executor, job) + 'Executor lookup ("%s") failed for job "%s" -- removing it from the ' + 'job store', job.executor, job) self.remove_job(job.id, jobstore_alias) continue @@ -816,12 +958,21 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): executor.submit_job(job, run_times) except MaxInstancesReachedError: self._logger.warning( - 'Execution of job "%s" skipped: maximum number of running instances reached (%d)', - job, job.max_instances) - except: - self._logger.exception('Error submitting job "%s" to executor "%s"', job, job.executor) + 'Execution of job "%s" skipped: maximum number of running ' + 'instances reached (%d)', job, job.max_instances) + event = JobSubmissionEvent(EVENT_JOB_MAX_INSTANCES, job.id, + jobstore_alias, run_times) + events.append(event) + except BaseException: + self._logger.exception('Error submitting job "%s" to executor "%s"', + job, job.executor) + else: + event = JobSubmissionEvent(EVENT_JOB_SUBMITTED, job.id, jobstore_alias, + run_times) + events.append(event) - # Update the job if it has a next execution time. Otherwise remove it from the job store. + # Update the job if it has a next execution time. + # Otherwise remove it from the job store. job_next_run = job.trigger.get_next_fire_time(run_times[-1], now) if job_next_run: job._modify(next_run_time=job_next_run) @@ -829,17 +980,27 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): else: self.remove_job(job.id, jobstore_alias) - # Set a new next wakeup time if there isn't one yet or the jobstore has an even earlier one + # Set a new next wakeup time if there isn't one yet or + # the jobstore has an even earlier one jobstore_next_run_time = jobstore.get_next_run_time() - if jobstore_next_run_time and (next_wakeup_time is None or jobstore_next_run_time < next_wakeup_time): - next_wakeup_time = jobstore_next_run_time + if jobstore_next_run_time and (next_wakeup_time is None or + jobstore_next_run_time < next_wakeup_time): + next_wakeup_time = jobstore_next_run_time.astimezone(self.timezone) + + # Dispatch collected events + for event in events: + self._dispatch_event(event) # Determine the delay until this method should be called again - if next_wakeup_time is not None: - wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0) - self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, wait_seconds) - else: + if self.state == STATE_PAUSED: + wait_seconds = None + self._logger.debug('Scheduler is paused; waiting until resume() is called') + elif next_wakeup_time is None: wait_seconds = None self._logger.debug('No jobs; waiting until a job is added') + else: + wait_seconds = max(timedelta_seconds(next_wakeup_time - now), 0) + self._logger.debug('Next wakeup is due at %s (in %f seconds)', next_wakeup_time, + wait_seconds) return wait_seconds diff --git a/lib/apscheduler/schedulers/blocking.py b/lib/apscheduler/schedulers/blocking.py index 2720822c..e6171575 100644 --- a/lib/apscheduler/schedulers/blocking.py +++ b/lib/apscheduler/schedulers/blocking.py @@ -1,21 +1,21 @@ from __future__ import absolute_import + from threading import Event -from apscheduler.schedulers.base import BaseScheduler +from apscheduler.schedulers.base import BaseScheduler, STATE_STOPPED +from apscheduler.util import TIMEOUT_MAX class BlockingScheduler(BaseScheduler): """ - A scheduler that runs in the foreground (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block). + A scheduler that runs in the foreground + (:meth:`~apscheduler.schedulers.base.BaseScheduler.start` will block). """ - - MAX_WAIT_TIME = 4294967 # Maximum value accepted by Event.wait() on Windows - _event = None - def start(self): - super(BlockingScheduler, self).start() + def start(self, *args, **kwargs): self._event = Event() + super(BlockingScheduler, self).start(*args, **kwargs) self._main_loop() def shutdown(self, wait=True): @@ -23,10 +23,11 @@ class BlockingScheduler(BaseScheduler): self._event.set() def _main_loop(self): - while self.running: - wait_seconds = self._process_jobs() - self._event.wait(wait_seconds if wait_seconds is not None else self.MAX_WAIT_TIME) + wait_seconds = TIMEOUT_MAX + while self.state != STATE_STOPPED: + self._event.wait(wait_seconds) self._event.clear() + wait_seconds = self._process_jobs() def wakeup(self): self._event.set() diff --git a/lib/apscheduler/schedulers/gevent.py b/lib/apscheduler/schedulers/gevent.py index 9cce6589..d48ed74a 100644 --- a/lib/apscheduler/schedulers/gevent.py +++ b/lib/apscheduler/schedulers/gevent.py @@ -16,14 +16,14 @@ class GeventScheduler(BlockingScheduler): _greenlet = None - def start(self): - BaseScheduler.start(self) + def start(self, *args, **kwargs): self._event = Event() + BaseScheduler.start(self, *args, **kwargs) self._greenlet = gevent.spawn(self._main_loop) return self._greenlet - def shutdown(self, wait=True): - super(GeventScheduler, self).shutdown(wait) + def shutdown(self, *args, **kwargs): + super(GeventScheduler, self).shutdown(*args, **kwargs) self._greenlet.join() del self._greenlet diff --git a/lib/apscheduler/schedulers/qt.py b/lib/apscheduler/schedulers/qt.py index dde5afaa..6ee5d332 100644 --- a/lib/apscheduler/schedulers/qt.py +++ b/lib/apscheduler/schedulers/qt.py @@ -4,7 +4,7 @@ from apscheduler.schedulers.base import BaseScheduler try: from PyQt5.QtCore import QObject, QTimer -except ImportError: # pragma: nocover +except (ImportError, RuntimeError): # pragma: nocover try: from PyQt4.QtCore import QObject, QTimer except ImportError: @@ -19,12 +19,8 @@ class QtScheduler(BaseScheduler): _timer = None - def start(self): - super(QtScheduler, self).start() - self.wakeup() - - def shutdown(self, wait=True): - super(QtScheduler, self).shutdown(wait) + def shutdown(self, *args, **kwargs): + super(QtScheduler, self).shutdown(*args, **kwargs) self._stop_timer() def _start_timer(self, wait_seconds): diff --git a/lib/apscheduler/schedulers/tornado.py b/lib/apscheduler/schedulers/tornado.py index 78093308..0a9171f2 100644 --- a/lib/apscheduler/schedulers/tornado.py +++ b/lib/apscheduler/schedulers/tornado.py @@ -1,4 +1,5 @@ from __future__ import absolute_import + from datetime import timedelta from functools import wraps @@ -22,6 +23,8 @@ class TornadoScheduler(BaseScheduler): """ A scheduler that runs on a Tornado IOLoop. + The default executor can run jobs based on native coroutines (``async def``). + =========== =============================================================== ``io_loop`` Tornado IOLoop instance to use (defaults to the global IO loop) =========== =============================================================== @@ -30,10 +33,6 @@ class TornadoScheduler(BaseScheduler): _ioloop = None _timeout = None - def start(self): - super(TornadoScheduler, self).start() - self.wakeup() - @run_in_ioloop def shutdown(self, wait=True): super(TornadoScheduler, self).shutdown(wait) @@ -53,6 +52,10 @@ class TornadoScheduler(BaseScheduler): self._ioloop.remove_timeout(self._timeout) del self._timeout + def _create_default_executor(self): + from apscheduler.executors.tornado import TornadoExecutor + return TornadoExecutor() + @run_in_ioloop def wakeup(self): self._stop_timer() diff --git a/lib/apscheduler/schedulers/twisted.py b/lib/apscheduler/schedulers/twisted.py index 166b6130..6b43a84b 100644 --- a/lib/apscheduler/schedulers/twisted.py +++ b/lib/apscheduler/schedulers/twisted.py @@ -1,4 +1,5 @@ from __future__ import absolute_import + from functools import wraps from apscheduler.schedulers.base import BaseScheduler @@ -35,10 +36,6 @@ class TwistedScheduler(BaseScheduler): self._reactor = maybe_ref(config.pop('reactor', default_reactor)) super(TwistedScheduler, self)._configure(config) - def start(self): - super(TwistedScheduler, self).start() - self.wakeup() - @run_in_reactor def shutdown(self, wait=True): super(TwistedScheduler, self).shutdown(wait) diff --git a/lib/apscheduler/triggers/base.py b/lib/apscheduler/triggers/base.py index 3520d316..ce2526a8 100644 --- a/lib/apscheduler/triggers/base.py +++ b/lib/apscheduler/triggers/base.py @@ -1,4 +1,6 @@ from abc import ABCMeta, abstractmethod +from datetime import timedelta +import random import six @@ -6,11 +8,41 @@ import six class BaseTrigger(six.with_metaclass(ABCMeta)): """Abstract base class that defines the interface that every trigger must implement.""" + __slots__ = () + @abstractmethod def get_next_fire_time(self, previous_fire_time, now): """ - Returns the next datetime to fire on, If no such datetime can be calculated, returns ``None``. + Returns the next datetime to fire on, If no such datetime can be calculated, returns + ``None``. :param datetime.datetime previous_fire_time: the previous time the trigger was fired :param datetime.datetime now: current datetime """ + + def _apply_jitter(self, next_fire_time, jitter, now): + """ + Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the + resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter. + + ``next_fire_time - jitter <= result <= next_fire_time + jitter`` + + :param datetime.datetime|None next_fire_time: next fire time without jitter applied. If + ``None``, returns ``None``. + :param int|None jitter: maximum number of seconds to add or subtract to + ``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time`` + :param datetime.datetime now: current datetime + :return datetime.datetime|None: next fire time with a jitter. + """ + if next_fire_time is None or not jitter: + return next_fire_time + + next_fire_time_with_jitter = next_fire_time + timedelta( + seconds=random.uniform(-jitter, jitter)) + + if next_fire_time_with_jitter < now: + # Next fire time with jitter is in the past. + # Ignore jitter to avoid false misfire. + return next_fire_time + + return next_fire_time_with_jitter diff --git a/lib/apscheduler/triggers/combining.py b/lib/apscheduler/triggers/combining.py new file mode 100644 index 00000000..64f83011 --- /dev/null +++ b/lib/apscheduler/triggers/combining.py @@ -0,0 +1,95 @@ +from apscheduler.triggers.base import BaseTrigger +from apscheduler.util import obj_to_ref, ref_to_obj + + +class BaseCombiningTrigger(BaseTrigger): + __slots__ = ('triggers', 'jitter') + + def __init__(self, triggers, jitter=None): + self.triggers = triggers + self.jitter = jitter + + def __getstate__(self): + return { + 'version': 1, + 'triggers': [(obj_to_ref(trigger.__class__), trigger.__getstate__()) + for trigger in self.triggers], + 'jitter': self.jitter + } + + def __setstate__(self, state): + if state.get('version', 1) > 1: + raise ValueError( + 'Got serialized data for version %s of %s, but only versions up to 1 can be ' + 'handled' % (state['version'], self.__class__.__name__)) + + self.jitter = state['jitter'] + self.triggers = [] + for clsref, state in state['triggers']: + cls = ref_to_obj(clsref) + trigger = cls.__new__(cls) + trigger.__setstate__(state) + self.triggers.append(trigger) + + def __repr__(self): + return '<{}({}{})>'.format(self.__class__.__name__, self.triggers, + ', jitter={}'.format(self.jitter) if self.jitter else '') + + +class AndTrigger(BaseCombiningTrigger): + """ + Always returns the earliest next fire time that all the given triggers can agree on. + The trigger is considered to be finished when any of the given triggers has finished its + schedule. + + Trigger alias: ``and`` + + :param list triggers: triggers to combine + :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + """ + + __slots__ = () + + def get_next_fire_time(self, previous_fire_time, now): + while True: + fire_times = [trigger.get_next_fire_time(previous_fire_time, now) + for trigger in self.triggers] + if None in fire_times: + return None + elif min(fire_times) == max(fire_times): + return self._apply_jitter(fire_times[0], self.jitter, now) + else: + now = max(fire_times) + + def __str__(self): + return 'and[{}]'.format(', '.join(str(trigger) for trigger in self.triggers)) + + +class OrTrigger(BaseCombiningTrigger): + """ + Always returns the earliest next fire time produced by any of the given triggers. + The trigger is considered finished when all the given triggers have finished their schedules. + + Trigger alias: ``or`` + + :param list triggers: triggers to combine + :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + + .. note:: Triggers that depends on the previous fire time, such as the interval trigger, may + seem to behave strangely since they are always passed the previous fire time produced by + any of the given triggers. + """ + + __slots__ = () + + def get_next_fire_time(self, previous_fire_time, now): + fire_times = [trigger.get_next_fire_time(previous_fire_time, now) + for trigger in self.triggers] + fire_times = [fire_time for fire_time in fire_times if fire_time is not None] + if fire_times: + return self._apply_jitter(min(fire_times), self.jitter, now) + else: + return None + + def __str__(self): + return 'or[{}]'.format(', '.join(str(trigger) for trigger in self.triggers)) diff --git a/lib/apscheduler/triggers/cron/__init__.py b/lib/apscheduler/triggers/cron/__init__.py index 8df901ea..e10f893f 100644 --- a/lib/apscheduler/triggers/cron/__init__.py +++ b/lib/apscheduler/triggers/cron/__init__.py @@ -4,13 +4,15 @@ from tzlocal import get_localzone import six from apscheduler.triggers.base import BaseTrigger -from apscheduler.triggers.cron.fields import BaseField, WeekField, DayOfMonthField, DayOfWeekField, DEFAULT_VALUES +from apscheduler.triggers.cron.fields import ( + BaseField, MonthField, WeekField, DayOfMonthField, DayOfWeekField, DEFAULT_VALUES) from apscheduler.util import datetime_ceil, convert_to_datetime, datetime_repr, astimezone class CronTrigger(BaseTrigger): """ - Triggers when current time matches all specified time constraints, similarly to how the UNIX cron scheduler works. + Triggers when current time matches all specified time constraints, + similarly to how the UNIX cron scheduler works. :param int|str year: 4-digit year :param int|str month: month (1-12) @@ -22,8 +24,9 @@ class CronTrigger(BaseTrigger): :param int|str second: second (0-59) :param datetime|str start_date: earliest possible date/time to trigger on (inclusive) :param datetime|str end_date: latest possible date/time to trigger on (inclusive) - :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations - (defaults to scheduler timezone) + :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults + to scheduler timezone) + :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. .. note:: The first weekday is always **monday**. """ @@ -31,7 +34,7 @@ class CronTrigger(BaseTrigger): FIELD_NAMES = ('year', 'month', 'day', 'week', 'day_of_week', 'hour', 'minute', 'second') FIELDS_MAP = { 'year': BaseField, - 'month': BaseField, + 'month': MonthField, 'week': WeekField, 'day': DayOfMonthField, 'day_of_week': DayOfWeekField, @@ -40,15 +43,16 @@ class CronTrigger(BaseTrigger): 'second': BaseField } - __slots__ = 'timezone', 'start_date', 'end_date', 'fields' + __slots__ = 'timezone', 'start_date', 'end_date', 'fields', 'jitter' - def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None, minute=None, - second=None, start_date=None, end_date=None, timezone=None): + def __init__(self, year=None, month=None, day=None, week=None, day_of_week=None, hour=None, + minute=None, second=None, start_date=None, end_date=None, timezone=None, + jitter=None): if timezone: self.timezone = astimezone(timezone) - elif start_date and start_date.tzinfo: + elif isinstance(start_date, datetime) and start_date.tzinfo: self.timezone = start_date.tzinfo - elif end_date and end_date.tzinfo: + elif isinstance(end_date, datetime) and end_date.tzinfo: self.timezone = end_date.tzinfo else: self.timezone = get_localzone() @@ -56,6 +60,8 @@ class CronTrigger(BaseTrigger): self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date') self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date') + self.jitter = jitter + values = dict((key, value) for (key, value) in six.iteritems(locals()) if key in self.FIELD_NAMES and value is not None) self.fields = [] @@ -76,13 +82,35 @@ class CronTrigger(BaseTrigger): field = field_class(field_name, exprs, is_default) self.fields.append(field) + @classmethod + def from_crontab(cls, expr, timezone=None): + """ + Create a :class:`~CronTrigger` from a standard crontab expression. + + See https://en.wikipedia.org/wiki/Cron for more information on the format accepted here. + + :param expr: minute, hour, day of month, month, day of week + :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations ( + defaults to scheduler timezone) + :return: a :class:`~CronTrigger` instance + + """ + values = expr.split() + if len(values) != 5: + raise ValueError('Wrong number of fields; got {}, expected 5'.format(len(values))) + + return cls(minute=values[0], hour=values[1], day=values[2], month=values[3], + day_of_week=values[4], timezone=timezone) + def _increment_field_value(self, dateval, fieldnum): """ - Increments the designated field and resets all less significant fields to their minimum values. + Increments the designated field and resets all less significant fields to their minimum + values. :type dateval: datetime :type fieldnum: int - :return: a tuple containing the new date, and the number of the field that was actually incremented + :return: a tuple containing the new date, and the number of the field that was actually + incremented :rtype: tuple """ @@ -128,12 +156,13 @@ class CronTrigger(BaseTrigger): else: values[field.name] = new_value - difference = datetime(**values) - dateval.replace(tzinfo=None) - return self.timezone.normalize(dateval + difference) + return self.timezone.localize(datetime(**values)) def get_next_fire_time(self, previous_fire_time, now): if previous_fire_time: - start_date = max(now, previous_fire_time + timedelta(microseconds=1)) + start_date = min(now, previous_fire_time + timedelta(microseconds=1)) + if start_date == previous_fire_time: + start_date += timedelta(microseconds=1) else: start_date = max(now, self.start_date) if self.start_date else now @@ -163,8 +192,36 @@ class CronTrigger(BaseTrigger): return None if fieldnum >= 0: + if self.jitter is not None: + next_date = self._apply_jitter(next_date, self.jitter, now) return next_date + def __getstate__(self): + return { + 'version': 2, + 'timezone': self.timezone, + 'start_date': self.start_date, + 'end_date': self.end_date, + 'fields': self.fields, + 'jitter': self.jitter, + } + + def __setstate__(self, state): + # This is for compatibility with APScheduler 3.0.x + if isinstance(state, tuple): + state = state[1] + + if state.get('version', 1) > 2: + raise ValueError( + 'Got serialized data for version %s of %s, but only versions up to 2 can be ' + 'handled' % (state['version'], self.__class__.__name__)) + + self.timezone = state['timezone'] + self.start_date = state['start_date'] + self.end_date = state['end_date'] + self.fields = state['fields'] + self.jitter = state.get('jitter') + def __str__(self): options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default] return 'cron[%s]' % (', '.join(options)) @@ -172,5 +229,11 @@ class CronTrigger(BaseTrigger): def __repr__(self): options = ["%s='%s'" % (f.name, f) for f in self.fields if not f.is_default] if self.start_date: - options.append("start_date='%s'" % datetime_repr(self.start_date)) - return '<%s (%s)>' % (self.__class__.__name__, ', '.join(options)) + options.append("start_date=%r" % datetime_repr(self.start_date)) + if self.end_date: + options.append("end_date=%r" % datetime_repr(self.end_date)) + if self.jitter: + options.append('jitter=%s' % self.jitter) + + return "<%s (%s, timezone='%s')>" % ( + self.__class__.__name__, ', '.join(options), self.timezone) diff --git a/lib/apscheduler/triggers/cron/expressions.py b/lib/apscheduler/triggers/cron/expressions.py index 55272db4..55a37167 100644 --- a/lib/apscheduler/triggers/cron/expressions.py +++ b/lib/apscheduler/triggers/cron/expressions.py @@ -1,17 +1,16 @@ -""" -This module contains the expressions applicable for CronTrigger's fields. -""" +"""This module contains the expressions applicable for CronTrigger's fields.""" from calendar import monthrange import re from apscheduler.util import asint -__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', 'WeekdayPositionExpression', - 'LastDayOfMonthExpression') +__all__ = ('AllExpression', 'RangeExpression', 'WeekdayRangeExpression', + 'WeekdayPositionExpression', 'LastDayOfMonthExpression') WEEKDAYS = ['mon', 'tue', 'wed', 'thu', 'fri', 'sat', 'sun'] +MONTHS = ['jan', 'feb', 'mar', 'apr', 'may', 'jun', 'jul', 'aug', 'sep', 'oct', 'nov', 'dec'] class AllExpression(object): @@ -22,6 +21,14 @@ class AllExpression(object): if self.step == 0: raise ValueError('Increment must be higher than 0') + def validate_range(self, field_name): + from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES + + value_range = MAX_VALUES[field_name] - MIN_VALUES[field_name] + if self.step and self.step > value_range: + raise ValueError('the step value ({}) is higher than the total range of the ' + 'expression ({})'.format(self.step, value_range)) + def get_next_value(self, date, field): start = field.get_value(date) minval = field.get_min(date) @@ -37,6 +44,9 @@ class AllExpression(object): if next <= maxval: return next + def __eq__(self, other): + return isinstance(other, self.__class__) and self.step == other.step + def __str__(self): if self.step: return '*/%d' % self.step @@ -51,7 +61,7 @@ class RangeExpression(AllExpression): r'(?P\d+)(?:-(?P\d+))?(?:/(?P\d+))?$') def __init__(self, first, last=None, step=None): - AllExpression.__init__(self, step) + super(RangeExpression, self).__init__(step) first = asint(first) last = asint(last) if last is None and step is None: @@ -61,25 +71,41 @@ class RangeExpression(AllExpression): self.first = first self.last = last + def validate_range(self, field_name): + from apscheduler.triggers.cron.fields import MIN_VALUES, MAX_VALUES + + super(RangeExpression, self).validate_range(field_name) + if self.first < MIN_VALUES[field_name]: + raise ValueError('the first value ({}) is lower than the minimum value ({})' + .format(self.first, MIN_VALUES[field_name])) + if self.last is not None and self.last > MAX_VALUES[field_name]: + raise ValueError('the last value ({}) is higher than the maximum value ({})' + .format(self.last, MAX_VALUES[field_name])) + value_range = (self.last or MAX_VALUES[field_name]) - self.first + if self.step and self.step > value_range: + raise ValueError('the step value ({}) is higher than the total range of the ' + 'expression ({})'.format(self.step, value_range)) + def get_next_value(self, date, field): - start = field.get_value(date) + startval = field.get_value(date) minval = field.get_min(date) maxval = field.get_max(date) # Apply range limits minval = max(minval, self.first) - if self.last is not None: - maxval = min(maxval, self.last) - start = max(start, minval) + maxval = min(maxval, self.last) if self.last is not None else maxval + nextval = max(minval, startval) - if not self.step: - next = start - else: - distance_to_next = (self.step - (start - minval)) % self.step - next = start + distance_to_next + # Apply the step if defined + if self.step: + distance_to_next = (self.step - (nextval - minval)) % self.step + nextval += distance_to_next - if next <= maxval: - return next + return nextval if nextval <= maxval else None + + def __eq__(self, other): + return (isinstance(other, self.__class__) and self.first == other.first and + self.last == other.last) def __str__(self): if self.last != self.first and self.last is not None: @@ -100,6 +126,37 @@ class RangeExpression(AllExpression): return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) +class MonthRangeExpression(RangeExpression): + value_re = re.compile(r'(?P[a-z]+)(?:-(?P[a-z]+))?', re.IGNORECASE) + + def __init__(self, first, last=None): + try: + first_num = MONTHS.index(first.lower()) + 1 + except ValueError: + raise ValueError('Invalid month name "%s"' % first) + + if last: + try: + last_num = MONTHS.index(last.lower()) + 1 + except ValueError: + raise ValueError('Invalid month name "%s"' % last) + else: + last_num = None + + super(MonthRangeExpression, self).__init__(first_num, last_num) + + def __str__(self): + if self.last != self.first and self.last is not None: + return '%s-%s' % (MONTHS[self.first - 1], MONTHS[self.last - 1]) + return MONTHS[self.first - 1] + + def __repr__(self): + args = ["'%s'" % MONTHS[self.first]] + if self.last != self.first and self.last is not None: + args.append("'%s'" % MONTHS[self.last - 1]) + return "%s(%s)" % (self.__class__.__name__, ', '.join(args)) + + class WeekdayRangeExpression(RangeExpression): value_re = re.compile(r'(?P[a-z]+)(?:-(?P[a-z]+))?', re.IGNORECASE) @@ -117,7 +174,7 @@ class WeekdayRangeExpression(RangeExpression): else: last_num = None - RangeExpression.__init__(self, first_num, last_num) + super(WeekdayRangeExpression, self).__init__(first_num, last_num) def __str__(self): if self.last != self.first and self.last is not None: @@ -133,9 +190,11 @@ class WeekdayRangeExpression(RangeExpression): class WeekdayPositionExpression(AllExpression): options = ['1st', '2nd', '3rd', '4th', '5th', 'last'] - value_re = re.compile(r'(?P%s) +(?P(?:\d+|\w+))' % '|'.join(options), re.IGNORECASE) + value_re = re.compile(r'(?P%s) +(?P(?:\d+|\w+))' % + '|'.join(options), re.IGNORECASE) def __init__(self, option_name, weekday_name): + super(WeekdayPositionExpression, self).__init__(None) try: self.option_num = self.options.index(option_name.lower()) except ValueError: @@ -147,8 +206,7 @@ class WeekdayPositionExpression(AllExpression): raise ValueError('Invalid weekday name "%s"' % weekday_name) def get_next_value(self, date, field): - # Figure out the weekday of the month's first day and the number - # of days in that month + # Figure out the weekday of the month's first day and the number of days in that month first_day_wday, last_day = monthrange(date.year, date.month) # Calculate which day of the month is the first of the target weekdays @@ -160,23 +218,28 @@ class WeekdayPositionExpression(AllExpression): if self.option_num < 5: target_day = first_hit_day + self.option_num * 7 else: - target_day = first_hit_day + ((last_day - first_hit_day) / 7) * 7 + target_day = first_hit_day + ((last_day - first_hit_day) // 7) * 7 if target_day <= last_day and target_day >= date.day: return target_day + def __eq__(self, other): + return (super(WeekdayPositionExpression, self).__eq__(other) and + self.option_num == other.option_num and self.weekday == other.weekday) + def __str__(self): return '%s %s' % (self.options[self.option_num], WEEKDAYS[self.weekday]) def __repr__(self): - return "%s('%s', '%s')" % (self.__class__.__name__, self.options[self.option_num], WEEKDAYS[self.weekday]) + return "%s('%s', '%s')" % (self.__class__.__name__, self.options[self.option_num], + WEEKDAYS[self.weekday]) class LastDayOfMonthExpression(AllExpression): value_re = re.compile(r'last', re.IGNORECASE) def __init__(self): - pass + super(LastDayOfMonthExpression, self).__init__(None) def get_next_value(self, date, field): return monthrange(date.year, date.month)[1] diff --git a/lib/apscheduler/triggers/cron/fields.py b/lib/apscheduler/triggers/cron/fields.py index e220599f..86d620c4 100644 --- a/lib/apscheduler/triggers/cron/fields.py +++ b/lib/apscheduler/triggers/cron/fields.py @@ -1,22 +1,26 @@ -""" -Fields represent CronTrigger options which map to :class:`~datetime.datetime` -fields. -""" +"""Fields represent CronTrigger options which map to :class:`~datetime.datetime` fields.""" from calendar import monthrange +import re + +import six from apscheduler.triggers.cron.expressions import ( - AllExpression, RangeExpression, WeekdayPositionExpression, LastDayOfMonthExpression, WeekdayRangeExpression) + AllExpression, RangeExpression, WeekdayPositionExpression, LastDayOfMonthExpression, + WeekdayRangeExpression, MonthRangeExpression) -__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', 'WeekField', 'DayOfMonthField', 'DayOfWeekField') +__all__ = ('MIN_VALUES', 'MAX_VALUES', 'DEFAULT_VALUES', 'BaseField', 'WeekField', + 'DayOfMonthField', 'DayOfWeekField') -MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, 'day_of_week': 0, 'hour': 0, 'minute': 0, 'second': 0} -MAX_VALUES = {'year': 2 ** 63, 'month': 12, 'day:': 31, 'week': 53, 'day_of_week': 6, 'hour': 23, 'minute': 59, - 'second': 59} -DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', 'day_of_week': '*', 'hour': 0, 'minute': 0, - 'second': 0} +MIN_VALUES = {'year': 1970, 'month': 1, 'day': 1, 'week': 1, 'day_of_week': 0, 'hour': 0, + 'minute': 0, 'second': 0} +MAX_VALUES = {'year': 9999, 'month': 12, 'day': 31, 'week': 53, 'day_of_week': 6, 'hour': 23, + 'minute': 59, 'second': 59} +DEFAULT_VALUES = {'year': '*', 'month': 1, 'day': 1, 'week': '*', 'day_of_week': '*', 'hour': 0, + 'minute': 0, 'second': 0} +SEPARATOR = re.compile(' *, *') class BaseField(object): @@ -50,23 +54,29 @@ class BaseField(object): self.expressions = [] # Split a comma-separated expression list, if any - exprs = str(exprs).strip() - if ',' in exprs: - for expr in exprs.split(','): - self.compile_expression(expr) - else: - self.compile_expression(exprs) + for expr in SEPARATOR.split(str(exprs).strip()): + self.compile_expression(expr) def compile_expression(self, expr): for compiler in self.COMPILERS: match = compiler.value_re.match(expr) if match: compiled_expr = compiler(**match.groupdict()) + + try: + compiled_expr.validate_range(self.name) + except ValueError as e: + exc = ValueError('Error validating expression {!r}: {}'.format(expr, e)) + six.raise_from(exc, None) + self.expressions.append(compiled_expr) return raise ValueError('Unrecognized expression "%s" for field "%s"' % (expr, self.name)) + def __eq__(self, other): + return isinstance(self, self.__class__) and self.expressions == other.expressions + def __str__(self): expr_strings = (str(e) for e in self.expressions) return ','.join(expr_strings) @@ -95,3 +105,7 @@ class DayOfWeekField(BaseField): def get_value(self, dateval): return dateval.weekday() + + +class MonthField(BaseField): + COMPILERS = BaseField.COMPILERS + [MonthRangeExpression] diff --git a/lib/apscheduler/triggers/date.py b/lib/apscheduler/triggers/date.py index 237e6b4e..07681008 100644 --- a/lib/apscheduler/triggers/date.py +++ b/lib/apscheduler/triggers/date.py @@ -14,15 +14,36 @@ class DateTrigger(BaseTrigger): :param datetime.tzinfo|str timezone: time zone for ``run_date`` if it doesn't have one already """ - __slots__ = 'timezone', 'run_date' + __slots__ = 'run_date' def __init__(self, run_date=None, timezone=None): timezone = astimezone(timezone) or get_localzone() - self.run_date = convert_to_datetime(run_date or datetime.now(), timezone, 'run_date') + if run_date is not None: + self.run_date = convert_to_datetime(run_date, timezone, 'run_date') + else: + self.run_date = datetime.now(timezone) def get_next_fire_time(self, previous_fire_time, now): return self.run_date if previous_fire_time is None else None + def __getstate__(self): + return { + 'version': 1, + 'run_date': self.run_date + } + + def __setstate__(self, state): + # This is for compatibility with APScheduler 3.0.x + if isinstance(state, tuple): + state = state[1] + + if state.get('version', 1) > 1: + raise ValueError( + 'Got serialized data for version %s of %s, but only version 1 can be handled' % + (state['version'], self.__class__.__name__)) + + self.run_date = state['run_date'] + def __str__(self): return 'date[%s]' % datetime_repr(self.run_date) diff --git a/lib/apscheduler/triggers/interval.py b/lib/apscheduler/triggers/interval.py index df9e6fe7..831ba383 100644 --- a/lib/apscheduler/triggers/interval.py +++ b/lib/apscheduler/triggers/interval.py @@ -9,8 +9,8 @@ from apscheduler.util import convert_to_datetime, timedelta_seconds, datetime_re class IntervalTrigger(BaseTrigger): """ - Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` + interval - otherwise. + Triggers on specified intervals, starting on ``start_date`` if specified, ``datetime.now()`` + + interval otherwise. :param int weeks: number of weeks to wait :param int days: number of days to wait @@ -20,12 +20,15 @@ class IntervalTrigger(BaseTrigger): :param datetime|str start_date: starting point for the interval calculation :param datetime|str end_date: latest possible date/time to trigger on :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations + :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. """ - __slots__ = 'timezone', 'start_date', 'end_date', 'interval' + __slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter' - def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, end_date=None, timezone=None): - self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, seconds=seconds) + def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None, + end_date=None, timezone=None, jitter=None): + self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes, + seconds=seconds) self.interval_length = timedelta_seconds(self.interval) if self.interval_length == 0: self.interval = timedelta(seconds=1) @@ -33,9 +36,9 @@ class IntervalTrigger(BaseTrigger): if timezone: self.timezone = astimezone(timezone) - elif start_date and start_date.tzinfo: + elif isinstance(start_date, datetime) and start_date.tzinfo: self.timezone = start_date.tzinfo - elif end_date and end_date.tzinfo: + elif isinstance(end_date, datetime) and end_date.tzinfo: self.timezone = end_date.tzinfo else: self.timezone = get_localzone() @@ -44,6 +47,8 @@ class IntervalTrigger(BaseTrigger): self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date') self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date') + self.jitter = jitter + def get_next_fire_time(self, previous_fire_time, now): if previous_fire_time: next_fire_time = previous_fire_time + self.interval @@ -54,12 +59,48 @@ class IntervalTrigger(BaseTrigger): next_interval_num = int(ceil(timediff_seconds / self.interval_length)) next_fire_time = self.start_date + self.interval * next_interval_num + if self.jitter is not None: + next_fire_time = self._apply_jitter(next_fire_time, self.jitter, now) + if not self.end_date or next_fire_time <= self.end_date: return self.timezone.normalize(next_fire_time) + def __getstate__(self): + return { + 'version': 2, + 'timezone': self.timezone, + 'start_date': self.start_date, + 'end_date': self.end_date, + 'interval': self.interval, + 'jitter': self.jitter, + } + + def __setstate__(self, state): + # This is for compatibility with APScheduler 3.0.x + if isinstance(state, tuple): + state = state[1] + + if state.get('version', 1) > 2: + raise ValueError( + 'Got serialized data for version %s of %s, but only versions up to 2 can be ' + 'handled' % (state['version'], self.__class__.__name__)) + + self.timezone = state['timezone'] + self.start_date = state['start_date'] + self.end_date = state['end_date'] + self.interval = state['interval'] + self.interval_length = timedelta_seconds(self.interval) + self.jitter = state.get('jitter') + def __str__(self): return 'interval[%s]' % str(self.interval) def __repr__(self): - return "<%s (interval=%r, start_date='%s')>" % (self.__class__.__name__, self.interval, - datetime_repr(self.start_date)) + options = ['interval=%r' % self.interval, 'start_date=%r' % datetime_repr(self.start_date)] + if self.end_date: + options.append("end_date=%r" % datetime_repr(self.end_date)) + if self.jitter: + options.append('jitter=%s' % self.jitter) + + return "<%s (%s, timezone='%s')>" % ( + self.__class__.__name__, ', '.join(options), self.timezone) diff --git a/lib/apscheduler/util.py b/lib/apscheduler/util.py index 988f9427..63ac8ac8 100644 --- a/lib/apscheduler/util.py +++ b/lib/apscheduler/util.py @@ -2,9 +2,9 @@ from __future__ import division from datetime import date, datetime, time, timedelta, tzinfo -from inspect import isfunction, ismethod, getargspec from calendar import timegm import re +from functools import partial from pytz import timezone, utc import six @@ -12,14 +12,16 @@ import six try: from inspect import signature except ImportError: # pragma: nocover - try: - from funcsigs import signature - except ImportError: - signature = None + from funcsigs import signature + +try: + from threading import TIMEOUT_MAX +except ImportError: + TIMEOUT_MAX = 4294967 # Maximum value accepted by Event.wait() on Windows __all__ = ('asint', 'asbool', 'astimezone', 'convert_to_datetime', 'datetime_to_utc_timestamp', - 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name', 'obj_to_ref', - 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args') + 'utc_timestamp_to_datetime', 'timedelta_seconds', 'datetime_ceil', 'get_callable_name', + 'obj_to_ref', 'ref_to_obj', 'maybe_ref', 'repr_escape', 'check_callable_args') class _Undefined(object): @@ -32,17 +34,18 @@ class _Undefined(object): def __repr__(self): return '' + undefined = _Undefined() #: a unique object that only signifies that no value is defined def asint(text): """ - Safely converts a string to an integer, returning None if the string is None. + Safely converts a string to an integer, returning ``None`` if the string is ``None``. :type text: str :rtype: int - """ + """ if text is not None: return int(text) @@ -52,8 +55,8 @@ def asbool(obj): Interprets an object as a boolean value. :rtype: bool - """ + """ if isinstance(obj, str): obj = obj.strip().lower() if obj in ('true', 'yes', 'on', 'y', 't', '1'): @@ -69,15 +72,19 @@ def astimezone(obj): Interprets an object as a timezone. :rtype: tzinfo - """ + """ if isinstance(obj, six.string_types): return timezone(obj) if isinstance(obj, tzinfo): if not hasattr(obj, 'localize') or not hasattr(obj, 'normalize'): raise TypeError('Only timezones from the pytz library are supported') if obj.zone == 'local': - raise ValueError('Unable to determine the name of the local timezone -- use an explicit timezone instead') + raise ValueError( + 'Unable to determine the name of the local timezone -- you must explicitly ' + 'specify the name of the local timezone. Please refrain from using timezones like ' + 'EST to prevent problems with daylight saving time. Instead, use a locale based ' + 'timezone name (such as Europe/Helsinki).') return obj if obj is not None: raise TypeError('Expected tzinfo, got %s instead' % obj.__class__.__name__) @@ -92,20 +99,20 @@ _DATE_REGEX = re.compile( def convert_to_datetime(input, tz, arg_name): """ Converts the given object to a timezone aware datetime object. + If a timezone aware datetime object is passed, it is returned unmodified. If a native datetime object is passed, it is given the specified timezone. If the input is a string, it is parsed as a datetime with the given timezone. - Date strings are accepted in three different forms: date only (Y-m-d), - date with time (Y-m-d H:M:S) or with date+time with microseconds - (Y-m-d H:M:S.micro). + Date strings are accepted in three different forms: date only (Y-m-d), date with time + (Y-m-d H:M:S) or with date+time with microseconds (Y-m-d H:M:S.micro). :param str|datetime input: the datetime or string to convert to a timezone aware datetime :param datetime.tzinfo tz: timezone to interpret ``input`` in :param str arg_name: the name of the argument (used in an error message) :rtype: datetime - """ + """ if input is None: return elif isinstance(input, datetime): @@ -125,14 +132,16 @@ def convert_to_datetime(input, tz, arg_name): if datetime_.tzinfo is not None: return datetime_ if tz is None: - raise ValueError('The "tz" argument must be specified if %s has no timezone information' % arg_name) + raise ValueError( + 'The "tz" argument must be specified if %s has no timezone information' % arg_name) if isinstance(tz, six.string_types): tz = timezone(tz) try: return tz.localize(datetime_, is_dst=None) except AttributeError: - raise TypeError('Only pytz timezones are supported (need the localize() and normalize() methods)') + raise TypeError( + 'Only pytz timezones are supported (need the localize() and normalize() methods)') def datetime_to_utc_timestamp(timeval): @@ -141,8 +150,8 @@ def datetime_to_utc_timestamp(timeval): :type timeval: datetime :rtype: float - """ + """ if timeval is not None: return timegm(timeval.utctimetuple()) + timeval.microsecond / 1000000 @@ -153,8 +162,8 @@ def utc_timestamp_to_datetime(timestamp): :type timestamp: float :rtype: datetime - """ + """ if timestamp is not None: return datetime.fromtimestamp(timestamp, utc) @@ -165,8 +174,8 @@ def timedelta_seconds(delta): :type delta: timedelta :rtype: float - """ + """ return delta.days * 24 * 60 * 60 + delta.seconds + \ delta.microseconds / 1000000.0 @@ -176,8 +185,8 @@ def datetime_ceil(dateval): Rounds the given datetime object upwards. :type dateval: datetime - """ + """ if dateval.microsecond > 0: return dateval + timedelta(seconds=1, microseconds=-dateval.microsecond) return dateval @@ -192,8 +201,8 @@ def get_callable_name(func): Returns the best available display name for the given function/callable. :rtype: str - """ + """ # the easy case (on Python 3.3+) if hasattr(func, '__qualname__'): return func.__qualname__ @@ -222,20 +231,24 @@ def get_callable_name(func): def obj_to_ref(obj): """ - Returns the path to the given object. + Returns the path to the given callable. :rtype: str + :raises TypeError: if the given object is not callable + :raises ValueError: if the given object is a :class:`~functools.partial`, lambda or a nested + function + """ + if isinstance(obj, partial): + raise ValueError('Cannot create a reference to a partial()') - try: - ref = '%s:%s' % (obj.__module__, get_callable_name(obj)) - obj2 = ref_to_obj(ref) - if obj != obj2: - raise ValueError - except Exception: - raise ValueError('Cannot determine the reference to %r' % obj) + name = get_callable_name(obj) + if '' in name: + raise ValueError('Cannot create a reference to a lambda') + if '' in name: + raise ValueError('Cannot create a reference to a nested function') - return ref + return '%s:%s' % (obj.__module__, name) def ref_to_obj(ref): @@ -243,8 +256,8 @@ def ref_to_obj(ref): Returns the object pointed to by ``ref``. :type ref: str - """ + """ if not isinstance(ref, six.string_types): raise TypeError('References must be strings') if ':' not in ref: @@ -252,12 +265,12 @@ def ref_to_obj(ref): modulename, rest = ref.split(':', 1) try: - obj = __import__(modulename) + obj = __import__(modulename, fromlist=[rest]) except ImportError: raise LookupError('Error resolving reference %s: could not import module' % ref) try: - for name in modulename.split('.')[1:] + rest.split('.'): + for name in rest.split('.'): obj = getattr(obj, name) return obj except Exception: @@ -268,8 +281,8 @@ def maybe_ref(ref): """ Returns the object that the given reference points to, if it is indeed a reference. If it is not a reference, the object is returned as-is. - """ + """ if not isinstance(ref, str): return ref return ref_to_obj(ref) @@ -281,7 +294,8 @@ if six.PY2: return string.encode('ascii', 'backslashreplace') return string else: - repr_escape = lambda string: string + def repr_escape(string): + return string def check_callable_args(func, args, kwargs): @@ -290,70 +304,51 @@ def check_callable_args(func, args, kwargs): :type args: tuple :type kwargs: dict - """ + """ pos_kwargs_conflicts = [] # parameters that have a match in both args and kwargs positional_only_kwargs = [] # positional-only parameters that have a match in kwargs unsatisfied_args = [] # parameters in signature that don't have a match in args or kwargs unsatisfied_kwargs = [] # keyword-only arguments that don't have a match in kwargs unmatched_args = list(args) # args that didn't match any of the parameters in the signature - unmatched_kwargs = list(kwargs) # kwargs that didn't match any of the parameters in the signature - has_varargs = has_var_kwargs = False # indicates if the signature defines *args and **kwargs respectively + # kwargs that didn't match any of the parameters in the signature + unmatched_kwargs = list(kwargs) + # indicates if the signature defines *args and **kwargs respectively + has_varargs = has_var_kwargs = False - if signature: - try: - sig = signature(func) - except ValueError: - return # signature() doesn't work against every kind of callable + try: + sig = signature(func) + except ValueError: + # signature() doesn't work against every kind of callable + return - for param in six.itervalues(sig.parameters): - if param.kind == param.POSITIONAL_OR_KEYWORD: - if param.name in unmatched_kwargs and unmatched_args: - pos_kwargs_conflicts.append(param.name) - elif unmatched_args: - del unmatched_args[0] - elif param.name in unmatched_kwargs: - unmatched_kwargs.remove(param.name) - elif param.default is param.empty: - unsatisfied_args.append(param.name) - elif param.kind == param.POSITIONAL_ONLY: - if unmatched_args: - del unmatched_args[0] - elif param.name in unmatched_kwargs: - unmatched_kwargs.remove(param.name) - positional_only_kwargs.append(param.name) - elif param.default is param.empty: - unsatisfied_args.append(param.name) - elif param.kind == param.KEYWORD_ONLY: - if param.name in unmatched_kwargs: - unmatched_kwargs.remove(param.name) - elif param.default is param.empty: - unsatisfied_kwargs.append(param.name) - elif param.kind == param.VAR_POSITIONAL: - has_varargs = True - elif param.kind == param.VAR_KEYWORD: - has_var_kwargs = True - else: - if not isfunction(func) and not ismethod(func) and hasattr(func, '__call__'): - func = func.__call__ - - try: - argspec = getargspec(func) - except TypeError: - return # getargspec() doesn't work certain callables - - argspec_args = argspec.args if not ismethod(func) else argspec.args[1:] - has_varargs = bool(argspec.varargs) - has_var_kwargs = bool(argspec.keywords) - for arg, default in six.moves.zip_longest(argspec_args, argspec.defaults or (), fillvalue=undefined): - if arg in unmatched_kwargs and unmatched_args: - pos_kwargs_conflicts.append(arg) + for param in six.itervalues(sig.parameters): + if param.kind == param.POSITIONAL_OR_KEYWORD: + if param.name in unmatched_kwargs and unmatched_args: + pos_kwargs_conflicts.append(param.name) elif unmatched_args: del unmatched_args[0] - elif arg in unmatched_kwargs: - unmatched_kwargs.remove(arg) - elif default is undefined: - unsatisfied_args.append(arg) + elif param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + elif param.default is param.empty: + unsatisfied_args.append(param.name) + elif param.kind == param.POSITIONAL_ONLY: + if unmatched_args: + del unmatched_args[0] + elif param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + positional_only_kwargs.append(param.name) + elif param.default is param.empty: + unsatisfied_args.append(param.name) + elif param.kind == param.KEYWORD_ONLY: + if param.name in unmatched_kwargs: + unmatched_kwargs.remove(param.name) + elif param.default is param.empty: + unsatisfied_kwargs.append(param.name) + elif param.kind == param.VAR_POSITIONAL: + has_varargs = True + elif param.kind == param.VAR_KEYWORD: + has_var_kwargs = True # Make sure there are no conflicts between args and kwargs if pos_kwargs_conflicts: @@ -365,21 +360,26 @@ def check_callable_args(func, args, kwargs): raise ValueError('The following arguments cannot be given as keyword arguments: %s' % ', '.join(positional_only_kwargs)) - # Check that the number of positional arguments minus the number of matched kwargs matches the argspec + # Check that the number of positional arguments minus the number of matched kwargs matches the + # argspec if unsatisfied_args: - raise ValueError('The following arguments have not been supplied: %s' % ', '.join(unsatisfied_args)) + raise ValueError('The following arguments have not been supplied: %s' % + ', '.join(unsatisfied_args)) # Check that all keyword-only arguments have been supplied if unsatisfied_kwargs: - raise ValueError('The following keyword-only arguments have not been supplied in kwargs: %s' % - ', '.join(unsatisfied_kwargs)) + raise ValueError( + 'The following keyword-only arguments have not been supplied in kwargs: %s' % + ', '.join(unsatisfied_kwargs)) # Check that the callable can accept the given number of positional arguments if not has_varargs and unmatched_args: - raise ValueError('The list of positional arguments is longer than the target callable can handle ' - '(allowed: %d, given in args: %d)' % (len(args) - len(unmatched_args), len(args))) + raise ValueError( + 'The list of positional arguments is longer than the target callable can handle ' + '(allowed: %d, given in args: %d)' % (len(args) - len(unmatched_args), len(args))) # Check that the callable can accept the given keyword arguments if not has_var_kwargs and unmatched_kwargs: - raise ValueError('The target callable does not accept the following keyword arguments: %s' % - ', '.join(unmatched_kwargs)) + raise ValueError( + 'The target callable does not accept the following keyword arguments: %s' % + ', '.join(unmatched_kwargs))