Update apshceduler-3.8.0

This commit is contained in:
JonnyWong16 2021-10-14 20:36:43 -07:00
parent ec3702c358
commit dd32f1e5b4
No known key found for this signature in database
GPG key ID: B1F1F9807184697A
16 changed files with 81 additions and 50 deletions

View file

@ -1,5 +1,6 @@
import logging
import sys
import traceback
from datetime import datetime, timedelta
from traceback import format_tb
@ -33,6 +34,7 @@ async def run_coroutine_job(job, jobstore_alias, run_times, logger_name):
events.append(JobExecutionEvent(EVENT_JOB_ERROR, job.id, jobstore_alias, run_time,
exception=exc, traceback=formatted_tb))
logger.exception('Job "%s" raised an exception', job)
traceback.clear_frames(tb)
else:
events.append(JobExecutionEvent(EVENT_JOB_EXECUTED, job.id, jobstore_alias, run_time,
retval=retval))

View file

@ -3,6 +3,11 @@ import concurrent.futures
from apscheduler.executors.base import BaseExecutor, run_job
try:
from concurrent.futures.process import BrokenProcessPool
except ImportError:
BrokenProcessPool = None
class BasePoolExecutor(BaseExecutor):
@abstractmethod
@ -19,7 +24,13 @@ class BasePoolExecutor(BaseExecutor):
else:
self._run_job_success(job.id, f.result())
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
try:
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
except BrokenProcessPool:
self._logger.warning('Process pool is broken; replacing pool with a fresh instance')
self._pool = self._pool.__class__(self._pool._max_workers)
f = self._pool.submit(run_job, job, job._jobstore_alias, run_times, self._logger.name)
f.add_done_callback(callback)
def shutdown(self, wait=True):
@ -33,10 +44,13 @@ class ThreadPoolExecutor(BasePoolExecutor):
Plugin alias: ``threadpool``
:param max_workers: the maximum number of spawned threads.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ThreadPoolExecutor constructor
"""
def __init__(self, max_workers=10):
pool = concurrent.futures.ThreadPoolExecutor(int(max_workers))
def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ThreadPoolExecutor(int(max_workers), **pool_kwargs)
super(ThreadPoolExecutor, self).__init__(pool)
@ -47,8 +61,11 @@ class ProcessPoolExecutor(BasePoolExecutor):
Plugin alias: ``processpool``
:param max_workers: the maximum number of spawned processes.
:param pool_kwargs: dict of keyword arguments to pass to the underlying
ProcessPoolExecutor constructor
"""
def __init__(self, max_workers=10):
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers))
def __init__(self, max_workers=10, pool_kwargs=None):
pool_kwargs = pool_kwargs or {}
pool = concurrent.futures.ProcessPoolExecutor(int(max_workers), **pool_kwargs)
super(ProcessPoolExecutor, self).__init__(pool)

View file

@ -28,7 +28,7 @@ class Job(object):
:var trigger: the trigger object that controls the schedule of this job
:var str executor: the name of the executor that will run this job
:var int misfire_grace_time: the time (in seconds) how much this job's execution is allowed to
be late
be late (``None`` means "allow the job to run no matter how late it is")
:var int max_instances: the maximum number of concurrently executing instances allowed for this
job
:var datetime.datetime next_run_time: the next scheduled run time of this job
@ -40,7 +40,7 @@ class Job(object):
__slots__ = ('_scheduler', '_jobstore_alias', 'id', 'trigger', 'executor', 'func', 'func_ref',
'args', 'kwargs', 'name', 'misfire_grace_time', 'coalesce', 'max_instances',
'next_run_time')
'next_run_time', '__weakref__')
def __init__(self, scheduler, id=None, **kwargs):
super(Job, self).__init__()
@ -242,8 +242,9 @@ class Job(object):
# Instance methods cannot survive serialization as-is, so store the "self" argument
# explicitly
if ismethod(self.func) and not isclass(self.func.__self__):
args = (self.func.__self__,) + tuple(self.args)
func = self.func
if ismethod(func) and not isclass(func.__self__) and obj_to_ref(func) == self.func_ref:
args = (func.__self__,) + tuple(self.args)
else:
args = self.args

View file

@ -54,7 +54,7 @@ class MongoDBJobStore(BaseJobStore):
def start(self, scheduler, alias):
super(MongoDBJobStore, self).start(scheduler, alias)
self.collection.ensure_index('next_run_time', sparse=True)
self.collection.create_index('next_run_time', sparse=True)
@property
def connection(self):
@ -83,7 +83,7 @@ class MongoDBJobStore(BaseJobStore):
def add_job(self, job):
try:
self.collection.insert({
self.collection.insert_one({
'_id': job.id,
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
@ -96,13 +96,13 @@ class MongoDBJobStore(BaseJobStore):
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': Binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
}
result = self.collection.update({'_id': job.id}, {'$set': changes})
if result and result['n'] == 0:
result = self.collection.update_one({'_id': job.id}, {'$set': changes})
if result and result.matched_count == 0:
raise JobLookupError(job.id)
def remove_job(self, job_id):
result = self.collection.remove(job_id)
if result and result['n'] == 0:
result = self.collection.delete_one({'_id': job_id})
if result and result.deleted_count == 0:
raise JobLookupError(job_id)
def remove_all_jobs(self):

View file

@ -11,7 +11,7 @@ except ImportError: # pragma: nocover
try:
from sqlalchemy import (
create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select)
create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select, and_)
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql.expression import null
except ImportError: # pragma: nocover
@ -134,7 +134,7 @@ class SQLAlchemyJobStore(BaseJobStore):
jobs = []
selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\
order_by(self.jobs_t.c.next_run_time)
selectable = selectable.where(*conditions) if conditions else selectable
selectable = selectable.where(and_(*conditions)) if conditions else selectable
failed_job_ids = set()
for row in self.engine.execute(selectable):
try:

View file

@ -1,6 +1,5 @@
from __future__ import absolute_import
import os
from datetime import datetime
from pytz import utc
@ -65,7 +64,7 @@ class ZooKeeperJobStore(BaseJobStore):
def lookup_job(self, job_id):
self._ensure_paths()
node_path = os.path.join(self.path, job_id)
node_path = self.path + "/" + str(job_id)
try:
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
@ -92,7 +91,7 @@ class ZooKeeperJobStore(BaseJobStore):
def add_job(self, job):
self._ensure_paths()
node_path = os.path.join(self.path, str(job.id))
node_path = self.path + "/" + str(job.id)
value = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
@ -105,7 +104,7 @@ class ZooKeeperJobStore(BaseJobStore):
def update_job(self, job):
self._ensure_paths()
node_path = os.path.join(self.path, str(job.id))
node_path = self.path + "/" + str(job.id)
changes = {
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
'job_state': job.__getstate__()
@ -118,7 +117,7 @@ class ZooKeeperJobStore(BaseJobStore):
def remove_job(self, job_id):
self._ensure_paths()
node_path = os.path.join(self.path, str(job_id))
node_path = self.path + "/" + str(job_id)
try:
self.client.delete(node_path)
except NoNodeError:
@ -151,7 +150,7 @@ class ZooKeeperJobStore(BaseJobStore):
all_ids = self.client.get_children(self.path)
for node_name in all_ids:
try:
node_path = os.path.join(self.path, node_name)
node_path = self.path + "/" + node_name
content, _ = self.client.get(node_path)
doc = pickle.loads(content)
job_def = {

View file

@ -38,13 +38,19 @@ class AsyncIOScheduler(BaseScheduler):
_eventloop = None
_timeout = None
def start(self, paused=False):
if not self._eventloop:
self._eventloop = asyncio.get_event_loop()
super(AsyncIOScheduler, self).start(paused)
@run_in_event_loop
def shutdown(self, wait=True):
super(AsyncIOScheduler, self).shutdown(wait)
self._stop_timer()
def _configure(self, config):
self._eventloop = maybe_ref(config.pop('event_loop', None)) or asyncio.get_event_loop()
self._eventloop = maybe_ref(config.pop('event_loop', None))
super(AsyncIOScheduler, self)._configure(config)
def _start_timer(self, wait_seconds):

View file

@ -29,7 +29,9 @@ class BackgroundScheduler(BlockingScheduler):
super(BackgroundScheduler, self)._configure(config)
def start(self, *args, **kwargs):
self._event = Event()
if self._event is None or self._event.is_set():
self._event = Event()
BaseScheduler.start(self, *args, **kwargs)
self._thread = Thread(target=self._main_loop, name='APScheduler')
self._thread.daemon = self._daemon

View file

@ -86,6 +86,11 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
self.state = STATE_STOPPED
self.configure(gconfig, **options)
def __getstate__(self):
raise TypeError("Schedulers cannot be serialized. Ensure that you are not passing a "
"scheduler instance as an argument to a job, or scheduling an instance "
"method where the instance contains a scheduler as an attribute.")
def configure(self, gconfig={}, prefix='apscheduler.', **options):
"""
Reconfigures the scheduler with the given options.
@ -402,7 +407,7 @@ class BaseScheduler(six.with_metaclass(ABCMeta)):
:param str|unicode id: explicit identifier for the job (for modifying it later)
:param str|unicode name: textual description of the job
:param int misfire_grace_time: seconds after the designated runtime that the job is still
allowed to be run
allowed to be run (or ``None`` to allow the job to run no matter how late it is)
:param bool coalesce: run once instead of many times if the scheduler determines that the
job should be run more than once in succession
:param int max_instances: maximum number of concurrently running instances allowed for this

View file

@ -14,7 +14,9 @@ class BlockingScheduler(BaseScheduler):
_event = None
def start(self, *args, **kwargs):
self._event = Event()
if self._event is None or self._event.is_set():
self._event = Event()
super(BlockingScheduler, self).start(*args, **kwargs)
self._main_loop()

View file

@ -9,9 +9,13 @@ except (ImportError, RuntimeError): # pragma: nocover
from PyQt4.QtCore import QObject, QTimer
except ImportError:
try:
from PySide.QtCore import QObject, QTimer # noqa
from PySide2.QtCore import QObject, QTimer # noqa
except ImportError:
raise ImportError('QtScheduler requires either PyQt5, PyQt4 or PySide installed')
try:
from PySide.QtCore import QObject, QTimer # noqa
except ImportError:
raise ImportError('QtScheduler requires either PyQt5, PyQt4, PySide2 '
'or PySide installed')
class QtScheduler(BaseScheduler):

View file

@ -22,27 +22,16 @@ class BaseTrigger(six.with_metaclass(ABCMeta)):
def _apply_jitter(self, next_fire_time, jitter, now):
"""
Randomize ``next_fire_time`` by adding or subtracting a random value (the jitter). If the
resulting datetime is in the past, returns the initial ``next_fire_time`` without jitter.
``next_fire_time - jitter <= result <= next_fire_time + jitter``
Randomize ``next_fire_time`` by adding a random value (the jitter).
:param datetime.datetime|None next_fire_time: next fire time without jitter applied. If
``None``, returns ``None``.
:param int|None jitter: maximum number of seconds to add or subtract to
``next_fire_time``. If ``None`` or ``0``, returns ``next_fire_time``
:param int|None jitter: maximum number of seconds to add to ``next_fire_time``
(if ``None`` or ``0``, returns ``next_fire_time``)
:param datetime.datetime now: current datetime
:return datetime.datetime|None: next fire time with a jitter.
"""
if next_fire_time is None or not jitter:
return next_fire_time
next_fire_time_with_jitter = next_fire_time + timedelta(
seconds=random.uniform(-jitter, jitter))
if next_fire_time_with_jitter < now:
# Next fire time with jitter is in the past.
# Ignore jitter to avoid false misfire.
return next_fire_time
return next_fire_time_with_jitter
return next_fire_time + timedelta(seconds=random.uniform(0, jitter))

View file

@ -45,7 +45,7 @@ class AndTrigger(BaseCombiningTrigger):
Trigger alias: ``and``
:param list triggers: triggers to combine
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
:param int|None jitter: delay the job execution by ``jitter`` seconds at most
"""
__slots__ = ()
@ -73,7 +73,7 @@ class OrTrigger(BaseCombiningTrigger):
Trigger alias: ``or``
:param list triggers: triggers to combine
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
:param int|None jitter: delay the job execution by ``jitter`` seconds at most
.. note:: Triggers that depends on the previous fire time, such as the interval trigger, may
seem to behave strangely since they are always passed the previous fire time produced by

View file

@ -16,7 +16,7 @@ class CronTrigger(BaseTrigger):
:param int|str year: 4-digit year
:param int|str month: month (1-12)
:param int|str day: day of the (1-31)
:param int|str day: day of month (1-31)
:param int|str week: ISO week (1-53)
:param int|str day_of_week: number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)
:param int|str hour: hour (0-23)
@ -26,7 +26,7 @@ class CronTrigger(BaseTrigger):
:param datetime|str end_date: latest possible date/time to trigger on (inclusive)
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations (defaults
to scheduler timezone)
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
:param int|None jitter: delay the job execution by ``jitter`` seconds at most
.. note:: The first weekday is always **monday**.
"""

View file

@ -20,7 +20,7 @@ class IntervalTrigger(BaseTrigger):
:param datetime|str start_date: starting point for the interval calculation
:param datetime|str end_date: latest possible date/time to trigger on
:param datetime.tzinfo|str timezone: time zone to use for the date/time calculations
:param int|None jitter: advance or delay the job execution by ``jitter`` seconds at most.
:param int|None jitter: delay the job execution by ``jitter`` seconds at most
"""
__slots__ = 'timezone', 'start_date', 'end_date', 'interval', 'interval_length', 'jitter'

View file

@ -7,6 +7,7 @@ from calendar import timegm
from functools import partial
from inspect import isclass, ismethod
import re
import sys
from pytz import timezone, utc, FixedOffset
import six
@ -352,7 +353,10 @@ def check_callable_args(func, args, kwargs):
has_varargs = has_var_kwargs = False
try:
sig = signature(func)
if sys.version_info >= (3, 5):
sig = signature(func, follow_wrapped=False)
else:
sig = signature(func)
except ValueError:
# signature() doesn't work against every kind of callable
return