diff --git a/lib/apscheduler/executors/base_py3.py b/lib/apscheduler/executors/base_py3.py index 61abd842..7111d2ae 100644 --- a/lib/apscheduler/executors/base_py3.py +++ b/lib/apscheduler/executors/base_py3.py @@ -1,5 +1,6 @@ import logging import sys +import traceback from datetime import datetime, timedelta from traceback import format_tb @@ -33,6 +34,7 @@ async def run_coroutine_job(job, jobstore_alias, run_times, logger_name): events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time, exception=exc, traceback=formatted_tb)) logger.exception('Job "%s" raised an exception', job) + traceback.clear_frames(tb) else: events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time, retval=retval)) diff --git a/lib/apscheduler/executors/pool.py b/lib/apscheduler/executors/pool.py index 2f4ef455..c85896ec 100644 --- a/lib/apscheduler/executors/pool.py +++ b/lib/apscheduler/executors/pool.py @@ -3,6 +3,11 @@ import concurrent.futures from apscheduler.executors.base import BaseExecutor, run_job +try: + from concurrent.futures.process import BrokenProcessPool +except ImportError: + BrokenProcessPool = None + class BasePoolExecutor(BaseExecutor): @abstractmethod @@ -19,7 +24,13 @@ class BasePoolExecutor(BaseExecutor): else: self._run_job_success(job.id, f.result()) - f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + try: + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + except BrokenProcessPool: + self._logger.warning('Process pool is broken; replacing pool with a fresh instance') + self._pool = self._pool.__class__(self._pool._max_workers) + f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name) + f.add_done_callback(callback) def shutdown(self, wait=True): @@ -33,10 +44,13 @@ class ThreadPoolExecutor(BasePoolExecutor): Plugin alias: ``threadpool`` :param max_workers: the maximum number of spawned threads. + :param pool_kwargs: dict of keyword arguments to pass to the underlying + ThreadPoolExecutor constructor """ - def __init__(self, max_workers=10): - pool = concurrent.futures.ThreadPoolExecutor(int(max_workers)) + def __init__(self, max_workers=10, pool_kwargs=None): + pool_kwargs = pool_kwargs or {} + pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs) super(ThreadPoolExecutor, self).__init__(pool) @@ -47,8 +61,11 @@ class ProcessPoolExecutor(BasePoolExecutor): Plugin alias: ``processpool`` :param max_workers: the maximum number of spawned processes. + :param pool_kwargs: dict of keyword arguments to pass to the underlying + ProcessPoolExecutor constructor """ - def __init__(self, max_workers=10): - pool = concurrent.futures.ProcessPoolExecutor(int(max_workers)) + def __init__(self, max_workers=10, pool_kwargs=None): + pool_kwargs = pool_kwargs or {} + pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs) super(ProcessPoolExecutor, self).__init__(pool) diff --git a/lib/apscheduler/job.py b/lib/apscheduler/job.py index d676ca89..445d9a86 100644 --- a/lib/apscheduler/job.py +++ b/lib/apscheduler/job.py @@ -28,7 +28,7 @@ class Job(object): :var trigger: the trigger object that controls the schedule of this job :var str executor: the name of the executor that will run this job :var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to - be late + be late (``None`` means "allow the job to run no matter how late it is") :var int max_instances: the maximum number of concurrently executing instances allowed for this job :var datetime.datetime next_run_time: the next scheduled run time of this job @@ -40,7 +40,7 @@ class Job(object): __slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref', 'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances', - 'next_run_time') + 'next_run_time', '__weakref__') def __init__(self, scheduler, id=None, **kwargs): super(Job, self).__init__() @@ -242,8 +242,9 @@ class Job(object): # Instance methods cannot survive serialization as-is, so store the "self" argument # explicitly - if ismethod(self.func) and not isclass(self.func.__self__): - args = (self.func.__self__,) + tuple(self.args) + func = self.func + if ismethod(func) and not isclass(func.__self__) and obj_to_ref(func) == self.func_ref: + args = (func.__self__,) + tuple(self.args) else: args = self.args diff --git a/lib/apscheduler/jobstores/mongodb.py b/lib/apscheduler/jobstores/mongodb.py index 7dbc3b12..ea3097dd 100644 --- a/lib/apscheduler/jobstores/mongodb.py +++ b/lib/apscheduler/jobstores/mongodb.py @@ -54,7 +54,7 @@ class MongoDBJobStore(BaseJobStore): def start(self, scheduler, alias): super(MongoDBJobStore, self).start(scheduler, alias) - self.collection.ensure_index('next_run_time', sparse=True) + self.collection.create_index('next_run_time', sparse=True) @property def connection(self): @@ -83,7 +83,7 @@ class MongoDBJobStore(BaseJobStore): def add_job(self, job): try: - self.collection.insert({ + self.collection.insert_one({ '_id': job.id, 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) @@ -96,13 +96,13 @@ class MongoDBJobStore(BaseJobStore): 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol)) } - result = self.collection.update({'_id': job.id}, {'$set': changes}) - if result and result['n'] == 0: + result = self.collection.update_one({'_id': job.id}, {'$set': changes}) + if result and result.matched_count == 0: raise JobLookupError(job.id) def remove_job(self, job_id): - result = self.collection.remove(job_id) - if result and result['n'] == 0: + result = self.collection.delete_one({'_id': job_id}) + if result and result.deleted_count == 0: raise JobLookupError(job_id) def remove_all_jobs(self): diff --git a/lib/apscheduler/jobstores/sqlalchemy.py b/lib/apscheduler/jobstores/sqlalchemy.py index fecbd834..dcfd3e56 100644 --- a/lib/apscheduler/jobstores/sqlalchemy.py +++ b/lib/apscheduler/jobstores/sqlalchemy.py @@ -11,7 +11,7 @@ except ImportError: # pragma: nocover try: from sqlalchemy import ( - create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select) + create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_) from sqlalchemy.exc import IntegrityError from sqlalchemy.sql.expression import null except ImportError: # pragma: nocover @@ -134,7 +134,7 @@ class SQLAlchemyJobStore(BaseJobStore): jobs = [] selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\ order_by(self.jobs_t.c.next_run_time) - selectable = selectable.where(*conditions) if conditions else selectable + selectable = selectable.where(and_(*conditions)) if conditions else selectable failed_job_ids = set() for row in self.engine.execute(selectable): try: diff --git a/lib/apscheduler/jobstores/zookeeper.py b/lib/apscheduler/jobstores/zookeeper.py index 2cca83e8..52530693 100644 --- a/lib/apscheduler/jobstores/zookeeper.py +++ b/lib/apscheduler/jobstores/zookeeper.py @@ -1,6 +1,5 @@ from __future__ import absolute_import -import os from datetime import datetime from pytz import utc @@ -65,7 +64,7 @@ class ZooKeeperJobStore(BaseJobStore): def lookup_job(self, job_id): self._ensure_paths() - node_path = os.path.join(self.path, job_id) + node_path = self.path + "/" + str(job_id) try: content, _ = self.client.get(node_path) doc = pickle.loads(content) @@ -92,7 +91,7 @@ class ZooKeeperJobStore(BaseJobStore): def add_job(self, job): self._ensure_paths() - node_path = os.path.join(self.path, str(job.id)) + node_path = self.path + "/" + str(job.id) value = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': job.__getstate__() @@ -105,7 +104,7 @@ class ZooKeeperJobStore(BaseJobStore): def update_job(self, job): self._ensure_paths() - node_path = os.path.join(self.path, str(job.id)) + node_path = self.path + "/" + str(job.id) changes = { 'next_run_time': datetime_to_utc_timestamp(job.next_run_time), 'job_state': job.__getstate__() @@ -118,7 +117,7 @@ class ZooKeeperJobStore(BaseJobStore): def remove_job(self, job_id): self._ensure_paths() - node_path = os.path.join(self.path, str(job_id)) + node_path = self.path + "/" + str(job_id) try: self.client.delete(node_path) except NoNodeError: @@ -151,7 +150,7 @@ class ZooKeeperJobStore(BaseJobStore): all_ids = self.client.get_children(self.path) for node_name in all_ids: try: - node_path = os.path.join(self.path, node_name) + node_path = self.path + "/" + node_name content, _ = self.client.get(node_path) doc = pickle.loads(content) job_def = { diff --git a/lib/apscheduler/schedulers/asyncio.py b/lib/apscheduler/schedulers/asyncio.py index 289ef13f..70ebedeb 100644 --- a/lib/apscheduler/schedulers/asyncio.py +++ b/lib/apscheduler/schedulers/asyncio.py @@ -38,13 +38,19 @@ class AsyncIOScheduler(BaseScheduler): _eventloop = None _timeout = None + def start(self, paused=False): + if not self._eventloop: + self._eventloop = asyncio.get_event_loop() + + super(AsyncIOScheduler, self).start(paused) + @run_in_event_loop def shutdown(self, wait=True): super(AsyncIOScheduler, self).shutdown(wait) self._stop_timer() def _configure(self, config): - self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop() + self._eventloop = maybe_ref(config.pop('event_loop', None)) super(AsyncIOScheduler, self)._configure(config) def _start_timer(self, wait_seconds): diff --git a/lib/apscheduler/schedulers/background.py b/lib/apscheduler/schedulers/background.py index 03f29822..bb8f77da 100644 --- a/lib/apscheduler/schedulers/background.py +++ b/lib/apscheduler/schedulers/background.py @@ -29,7 +29,9 @@ class BackgroundScheduler(BlockingScheduler): super(BackgroundScheduler, self)._configure(config) def start(self, *args, **kwargs): - self._event = Event() + if self._event is None or self._event.is_set(): + self._event = Event() + BaseScheduler.start(self, *args, **kwargs) self._thread = Thread(target=self._main_loop, name='APScheduler') self._thread.daemon = self._daemon diff --git a/lib/apscheduler/schedulers/base.py b/lib/apscheduler/schedulers/base.py index 8e711549..3dfb7437 100644 --- a/lib/apscheduler/schedulers/base.py +++ b/lib/apscheduler/schedulers/base.py @@ -86,6 +86,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): self.state = STATE_STOPPED self.configure(gconfig, **options) + def __getstate__(self): + raise TypeError("Schedulers cannot be serialized. Ensure that you are not passing a " + "scheduler instance as an argument to a job, or scheduling an instance " + "method where the instance contains a scheduler as an attribute.") + def configure(self, gconfig={}, prefix='apscheduler.', **options): """ Reconfigures the scheduler with the given options. @@ -402,7 +407,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)): :param str|unicode id: explicit identifier for the job (for modifying it later) :param str|unicode name: textual description of the job :param int misfire_grace_time: seconds after the designated runtime that the job is still - allowed to be run + allowed to be run (or ``None`` to allow the job to run no matter how late it is) :param bool coalesce: run once instead of many times if the scheduler determines that the job should be run more than once in succession :param int max_instances: maximum number of concurrently running instances allowed for this diff --git a/lib/apscheduler/schedulers/blocking.py b/lib/apscheduler/schedulers/blocking.py index e6171575..4ecc9f6f 100644 --- a/lib/apscheduler/schedulers/blocking.py +++ b/lib/apscheduler/schedulers/blocking.py @@ -14,7 +14,9 @@ class BlockingScheduler(BaseScheduler): _event = None def start(self, *args, **kwargs): - self._event = Event() + if self._event is None or self._event.is_set(): + self._event = Event() + super(BlockingScheduler, self).start(*args, **kwargs) self._main_loop() diff --git a/lib/apscheduler/schedulers/qt.py b/lib/apscheduler/schedulers/qt.py index 0329a000..dda77d79 100644 --- a/lib/apscheduler/schedulers/qt.py +++ b/lib/apscheduler/schedulers/qt.py @@ -9,9 +9,13 @@ except (ImportError, RuntimeError): # pragma: nocover from PyQt4.QtCore import QObject, QTimer except ImportError: try: - from PySide.QtCore import QObject, QTimer # noqa + from PySide2.QtCore import QObject, QTimer # noqa except ImportError: - raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed') + try: + from PySide.QtCore import QObject, QTimer # noqa + except ImportError: + raise ImportError('QtScheduler requires either PyQt5, PyQt4, PySide2 ' + 'or PySide installed') class QtScheduler(BaseScheduler): diff --git a/lib/apscheduler/triggers/base.py b/lib/apscheduler/triggers/base.py index ce2526a8..55d010db 100644 --- a/lib/apscheduler/triggers/base.py +++ b/lib/apscheduler/triggers/base.py @@ -22,27 +22,16 @@ class BaseTrigger(six.with_metaclass(ABCMeta)): def _apply_jitter(self, next_fire_time, jitter, now): """ - Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the - resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter. - - ``next_fire_time - jitter <= result <= next_fire_time + jitter`` + Randomize ``next_fire_time`` by adding a random value (the jitter). :param datetime.datetime|None next_fire_time: next fire time without jitter applied. If ``None``, returns ``None``. - :param int|None jitter: maximum number of seconds to add or subtract to - ``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time`` + :param int|None jitter: maximum number of seconds to add to ``next_fire_time`` + (if ``None`` or ``0``, returns ``next_fire_time``) :param datetime.datetime now: current datetime :return datetime.datetime|None: next fire time with a jitter. """ if next_fire_time is None or not jitter: return next_fire_time - next_fire_time_with_jitter = next_fire_time + timedelta( - seconds=random.uniform(-jitter, jitter)) - - if next_fire_time_with_jitter < now: - # Next fire time with jitter is in the past. - # Ignore jitter to avoid false misfire. - return next_fire_time - - return next_fire_time_with_jitter + return next_fire_time + timedelta(seconds=random.uniform(0, jitter)) diff --git a/lib/apscheduler/triggers/combining.py b/lib/apscheduler/triggers/combining.py index 64f83011..bb900061 100644 --- a/lib/apscheduler/triggers/combining.py +++ b/lib/apscheduler/triggers/combining.py @@ -45,7 +45,7 @@ class AndTrigger(BaseCombiningTrigger): Trigger alias: ``and`` :param list triggers: triggers to combine - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most """ __slots__ = () @@ -73,7 +73,7 @@ class OrTrigger(BaseCombiningTrigger): Trigger alias: ``or`` :param list triggers: triggers to combine - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most .. note:: Triggers that depends on the previous fire time, such as the interval trigger, may seem to behave strangely since they are always passed the previous fire time produced by diff --git a/lib/apscheduler/triggers/cron/__init__.py b/lib/apscheduler/triggers/cron/__init__.py index ce675dd9..fec6e3b5 100644 --- a/lib/apscheduler/triggers/cron/__init__.py +++ b/lib/apscheduler/triggers/cron/__init__.py @@ -16,7 +16,7 @@ class CronTrigger(BaseTrigger): :param int|str year: 4-digit year :param int|str month: month (1-12) - :param int|str day: day of the (1-31) + :param int|str day: day of month (1-31) :param int|str week: ISO week (1-53) :param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) :param int|str hour: hour (0-23) @@ -26,7 +26,7 @@ class CronTrigger(BaseTrigger): :param datetime|str end_date: latest possible date/time to trigger on (inclusive) :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults to scheduler timezone) - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most .. note:: The first weekday is always **monday**. """ diff --git a/lib/apscheduler/triggers/interval.py b/lib/apscheduler/triggers/interval.py index 831ba383..61094aa1 100644 --- a/lib/apscheduler/triggers/interval.py +++ b/lib/apscheduler/triggers/interval.py @@ -20,7 +20,7 @@ class IntervalTrigger(BaseTrigger): :param datetime|str start_date: starting point for the interval calculation :param datetime|str end_date: latest possible date/time to trigger on :param datetime.tzinfo|str timezone: time zone to use for the date/time calculations - :param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most. + :param int|None jitter: delay the job execution by ``jitter`` seconds at most """ __slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter' diff --git a/lib/apscheduler/util.py b/lib/apscheduler/util.py index 8b7b3f5e..1e643bff 100644 --- a/lib/apscheduler/util.py +++ b/lib/apscheduler/util.py @@ -7,6 +7,7 @@ from calendar import timegm from functools import partial from inspect import isclass, ismethod import re +import sys from pytz import timezone, utc, FixedOffset import six @@ -352,7 +353,10 @@ def check_callable_args(func, args, kwargs): has_varargs = has_var_kwargs = False try: - sig = signature(func) + if sys.version_info >= (3, 5): + sig = signature(func, follow_wrapped=False) + else: + sig = signature(func) except ValueError: # signature() doesn't work against every kind of callable return