mirror of
https://github.com/Tautulli/Tautulli.git
synced 2025-08-21 13:53:24 -07:00
Update apscheduler 3.5.0
This commit is contained in:
parent
aa844b76fc
commit
8e13bf4f93
33 changed files with 1660 additions and 561 deletions
|
@ -8,23 +8,27 @@ class JobLookupError(KeyError):
|
|||
"""Raised when the job store cannot find a job for update or removal."""
|
||||
|
||||
def __init__(self, job_id):
|
||||
super(JobLookupError, self).__init__(six.u('No job by the id of %s was found') % job_id)
|
||||
super(JobLookupError, self).__init__(u'No job by the id of %s was found' % job_id)
|
||||
|
||||
|
||||
class ConflictingIdError(KeyError):
|
||||
"""Raised when the uniqueness of job IDs is being violated."""
|
||||
|
||||
def __init__(self, job_id):
|
||||
super(ConflictingIdError, self).__init__(six.u('Job identifier (%s) conflicts with an existing job') % job_id)
|
||||
super(ConflictingIdError, self).__init__(
|
||||
u'Job identifier (%s) conflicts with an existing job' % job_id)
|
||||
|
||||
|
||||
class TransientJobError(ValueError):
|
||||
"""Raised when an attempt to add transient (with no func_ref) job to a persistent job store is detected."""
|
||||
"""
|
||||
Raised when an attempt to add transient (with no func_ref) job to a persistent job store is
|
||||
detected.
|
||||
"""
|
||||
|
||||
def __init__(self, job_id):
|
||||
super(TransientJobError, self).__init__(
|
||||
six.u('Job (%s) cannot be added to this job store because a reference to the callable could not be '
|
||||
'determined.') % job_id)
|
||||
u'Job (%s) cannot be added to this job store because a reference to the callable '
|
||||
u'could not be determined.' % job_id)
|
||||
|
||||
|
||||
class BaseJobStore(six.with_metaclass(ABCMeta)):
|
||||
|
@ -36,10 +40,11 @@ class BaseJobStore(six.with_metaclass(ABCMeta)):
|
|||
|
||||
def start(self, scheduler, alias):
|
||||
"""
|
||||
Called by the scheduler when the scheduler is being started or when the job store is being added to an already
|
||||
running scheduler.
|
||||
Called by the scheduler when the scheduler is being started or when the job store is being
|
||||
added to an already running scheduler.
|
||||
|
||||
:param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting this job store
|
||||
:param apscheduler.schedulers.base.BaseScheduler scheduler: the scheduler that is starting
|
||||
this job store
|
||||
:param str|unicode alias: alias of this job store as it was assigned to the scheduler
|
||||
"""
|
||||
|
||||
|
@ -50,13 +55,22 @@ class BaseJobStore(six.with_metaclass(ABCMeta)):
|
|||
def shutdown(self):
|
||||
"""Frees any resources still bound to this job store."""
|
||||
|
||||
def _fix_paused_jobs_sorting(self, jobs):
|
||||
for i, job in enumerate(jobs):
|
||||
if job.next_run_time is not None:
|
||||
if i > 0:
|
||||
paused_jobs = jobs[:i]
|
||||
del jobs[:i]
|
||||
jobs.extend(paused_jobs)
|
||||
break
|
||||
|
||||
@abstractmethod
|
||||
def lookup_job(self, job_id):
|
||||
"""
|
||||
Returns a specific job, or ``None`` if it isn't found..
|
||||
|
||||
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of the returned job to
|
||||
point to the scheduler and itself, respectively.
|
||||
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
|
||||
the returned job to point to the scheduler and itself, respectively.
|
||||
|
||||
:param str|unicode job_id: identifier of the job
|
||||
:rtype: Job
|
||||
|
@ -75,7 +89,8 @@ class BaseJobStore(six.with_metaclass(ABCMeta)):
|
|||
@abstractmethod
|
||||
def get_next_run_time(self):
|
||||
"""
|
||||
Returns the earliest run time of all the jobs stored in this job store, or ``None`` if there are no active jobs.
|
||||
Returns the earliest run time of all the jobs stored in this job store, or ``None`` if
|
||||
there are no active jobs.
|
||||
|
||||
:rtype: datetime.datetime
|
||||
"""
|
||||
|
@ -83,11 +98,12 @@ class BaseJobStore(six.with_metaclass(ABCMeta)):
|
|||
@abstractmethod
|
||||
def get_all_jobs(self):
|
||||
"""
|
||||
Returns a list of all jobs in this job store. The returned jobs should be sorted by next run time (ascending).
|
||||
Paused jobs (next_run_time is None) should be sorted last.
|
||||
Returns a list of all jobs in this job store.
|
||||
The returned jobs should be sorted by next run time (ascending).
|
||||
Paused jobs (next_run_time == None) should be sorted last.
|
||||
|
||||
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of the returned jobs to
|
||||
point to the scheduler and itself, respectively.
|
||||
The job store is responsible for setting the ``scheduler`` and ``jobstore`` attributes of
|
||||
the returned jobs to point to the scheduler and itself, respectively.
|
||||
|
||||
:rtype: list[Job]
|
||||
"""
|
||||
|
|
|
@ -13,7 +13,8 @@ class MemoryJobStore(BaseJobStore):
|
|||
|
||||
def __init__(self):
|
||||
super(MemoryJobStore, self).__init__()
|
||||
self._jobs = [] # list of (job, timestamp), sorted by next_run_time and job id (ascending)
|
||||
# list of (job, timestamp), sorted by next_run_time and job id (ascending)
|
||||
self._jobs = []
|
||||
self._jobs_index = {} # id -> (job, timestamp) lookup table
|
||||
|
||||
def lookup_job(self, job_id):
|
||||
|
@ -80,13 +81,13 @@ class MemoryJobStore(BaseJobStore):
|
|||
|
||||
def _get_job_index(self, timestamp, job_id):
|
||||
"""
|
||||
Returns the index of the given job, or if it's not found, the index where the job should be inserted based on
|
||||
the given timestamp.
|
||||
Returns the index of the given job, or if it's not found, the index where the job should be
|
||||
inserted based on the given timestamp.
|
||||
|
||||
:type timestamp: int
|
||||
:type job_id: str
|
||||
"""
|
||||
|
||||
"""
|
||||
lo, hi = 0, len(self._jobs)
|
||||
timestamp = float('inf') if timestamp is None else timestamp
|
||||
while lo < hi:
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
from __future__ import absolute_import
|
||||
import warnings
|
||||
|
||||
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
|
||||
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
|
||||
|
@ -19,16 +20,18 @@ except ImportError: # pragma: nocover
|
|||
|
||||
class MongoDBJobStore(BaseJobStore):
|
||||
"""
|
||||
Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to pymongo's `MongoClient
|
||||
Stores jobs in a MongoDB database. Any leftover keyword arguments are directly passed to
|
||||
pymongo's `MongoClient
|
||||
<http://api.mongodb.org/python/current/api/pymongo/mongo_client.html#pymongo.mongo_client.MongoClient>`_.
|
||||
|
||||
Plugin alias: ``mongodb``
|
||||
|
||||
:param str database: database to store jobs in
|
||||
:param str collection: collection to store jobs in
|
||||
:param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of providing connection
|
||||
arguments
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available
|
||||
:param client: a :class:`~pymongo.mongo_client.MongoClient` instance to use instead of
|
||||
providing connection arguments
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
|
||||
highest available
|
||||
"""
|
||||
|
||||
def __init__(self, database='apscheduler', collection='jobs', client=None,
|
||||
|
@ -42,14 +45,23 @@ class MongoDBJobStore(BaseJobStore):
|
|||
raise ValueError('The "collection" parameter must not be empty')
|
||||
|
||||
if client:
|
||||
self.connection = maybe_ref(client)
|
||||
self.client = maybe_ref(client)
|
||||
else:
|
||||
connect_args.setdefault('w', 1)
|
||||
self.connection = MongoClient(**connect_args)
|
||||
self.client = MongoClient(**connect_args)
|
||||
|
||||
self.collection = self.connection[database][collection]
|
||||
self.collection = self.client[database][collection]
|
||||
|
||||
def start(self, scheduler, alias):
|
||||
super(MongoDBJobStore, self).start(scheduler, alias)
|
||||
self.collection.ensure_index('next_run_time', sparse=True)
|
||||
|
||||
@property
|
||||
def connection(self):
|
||||
warnings.warn('The "connection" member is deprecated -- use "client" instead',
|
||||
DeprecationWarning)
|
||||
return self.client
|
||||
|
||||
def lookup_job(self, job_id):
|
||||
document = self.collection.find_one(job_id, ['job_state'])
|
||||
return self._reconstitute_job(document['job_state']) if document else None
|
||||
|
@ -59,12 +71,15 @@ class MongoDBJobStore(BaseJobStore):
|
|||
return self._get_jobs({'next_run_time': {'$lte': timestamp}})
|
||||
|
||||
def get_next_run_time(self):
|
||||
document = self.collection.find_one({'next_run_time': {'$ne': None}}, fields=['next_run_time'],
|
||||
document = self.collection.find_one({'next_run_time': {'$ne': None}},
|
||||
projection=['next_run_time'],
|
||||
sort=[('next_run_time', ASCENDING)])
|
||||
return utc_timestamp_to_datetime(document['next_run_time']) if document else None
|
||||
|
||||
def get_all_jobs(self):
|
||||
return self._get_jobs({})
|
||||
jobs = self._get_jobs({})
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def add_job(self, job):
|
||||
try:
|
||||
|
@ -83,7 +98,7 @@ class MongoDBJobStore(BaseJobStore):
|
|||
}
|
||||
result = self.collection.update({'_id': job.id}, {'$set': changes})
|
||||
if result and result['n'] == 0:
|
||||
raise JobLookupError(id)
|
||||
raise JobLookupError(job.id)
|
||||
|
||||
def remove_job(self, job_id):
|
||||
result = self.collection.remove(job_id)
|
||||
|
@ -94,7 +109,7 @@ class MongoDBJobStore(BaseJobStore):
|
|||
self.collection.remove()
|
||||
|
||||
def shutdown(self):
|
||||
self.connection.disconnect()
|
||||
self.client.close()
|
||||
|
||||
def _reconstitute_job(self, job_state):
|
||||
job_state = pickle.loads(job_state)
|
||||
|
@ -107,11 +122,13 @@ class MongoDBJobStore(BaseJobStore):
|
|||
def _get_jobs(self, conditions):
|
||||
jobs = []
|
||||
failed_job_ids = []
|
||||
for document in self.collection.find(conditions, ['_id', 'job_state'], sort=[('next_run_time', ASCENDING)]):
|
||||
for document in self.collection.find(conditions, ['_id', 'job_state'],
|
||||
sort=[('next_run_time', ASCENDING)]):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(document['job_state']))
|
||||
except:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it', document['_id'])
|
||||
except BaseException:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it',
|
||||
document['_id'])
|
||||
failed_job_ids.append(document['_id'])
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
|
@ -121,4 +138,4 @@ class MongoDBJobStore(BaseJobStore):
|
|||
return jobs
|
||||
|
||||
def __repr__(self):
|
||||
return '<%s (client=%s)>' % (self.__class__.__name__, self.connection)
|
||||
return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
|
||||
|
|
|
@ -1,5 +1,7 @@
|
|||
from __future__ import absolute_import
|
||||
from datetime import datetime
|
||||
|
||||
from pytz import utc
|
||||
import six
|
||||
|
||||
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
|
||||
|
@ -19,14 +21,16 @@ except ImportError: # pragma: nocover
|
|||
|
||||
class RedisJobStore(BaseJobStore):
|
||||
"""
|
||||
Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's StrictRedis.
|
||||
Stores jobs in a Redis database. Any leftover keyword arguments are directly passed to redis's
|
||||
:class:`~redis.StrictRedis`.
|
||||
|
||||
Plugin alias: ``redis``
|
||||
|
||||
:param int db: the database number to store jobs in
|
||||
:param str jobs_key: key to store jobs in
|
||||
:param str run_times_key: key to store the jobs' run times in
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
|
||||
highest available
|
||||
"""
|
||||
|
||||
def __init__(self, db=0, jobs_key='apscheduler.jobs', run_times_key='apscheduler.run_times',
|
||||
|
@ -65,7 +69,8 @@ class RedisJobStore(BaseJobStore):
|
|||
def get_all_jobs(self):
|
||||
job_states = self.redis.hgetall(self.jobs_key)
|
||||
jobs = self._reconstitute_jobs(six.iteritems(job_states))
|
||||
return sorted(jobs, key=lambda job: job.next_run_time)
|
||||
paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
|
||||
return sorted(jobs, key=lambda job: job.next_run_time or paused_sort_key)
|
||||
|
||||
def add_job(self, job):
|
||||
if self.redis.hexists(self.jobs_key, job.id):
|
||||
|
@ -73,8 +78,10 @@ class RedisJobStore(BaseJobStore):
|
|||
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.multi()
|
||||
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
||||
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
|
||||
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
|
||||
self.pickle_protocol))
|
||||
if job.next_run_time:
|
||||
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
|
||||
pipe.execute()
|
||||
|
||||
def update_job(self, job):
|
||||
|
@ -82,7 +89,8 @@ class RedisJobStore(BaseJobStore):
|
|||
raise JobLookupError(job.id)
|
||||
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
||||
pipe.hset(self.jobs_key, job.id, pickle.dumps(job.__getstate__(),
|
||||
self.pickle_protocol))
|
||||
if job.next_run_time:
|
||||
pipe.zadd(self.run_times_key, datetime_to_utc_timestamp(job.next_run_time), job.id)
|
||||
else:
|
||||
|
@ -121,7 +129,7 @@ class RedisJobStore(BaseJobStore):
|
|||
for job_id, job_state in job_states:
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(job_state))
|
||||
except:
|
||||
except BaseException:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it', job_id)
|
||||
failed_job_ids.append(job_id)
|
||||
|
||||
|
|
153
lib/apscheduler/jobstores/rethinkdb.py
Normal file
153
lib/apscheduler/jobstores/rethinkdb.py
Normal file
|
@ -0,0 +1,153 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
|
||||
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
|
||||
from apscheduler.job import Job
|
||||
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError: # pragma: nocover
|
||||
import pickle
|
||||
|
||||
try:
|
||||
import rethinkdb as r
|
||||
except ImportError: # pragma: nocover
|
||||
raise ImportError('RethinkDBJobStore requires rethinkdb installed')
|
||||
|
||||
|
||||
class RethinkDBJobStore(BaseJobStore):
|
||||
"""
|
||||
Stores jobs in a RethinkDB database. Any leftover keyword arguments are directly passed to
|
||||
rethinkdb's `RethinkdbClient <http://www.rethinkdb.com/api/#connect>`_.
|
||||
|
||||
Plugin alias: ``rethinkdb``
|
||||
|
||||
:param str database: database to store jobs in
|
||||
:param str collection: collection to store jobs in
|
||||
:param client: a :class:`rethinkdb.net.Connection` instance to use instead of providing
|
||||
connection arguments
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
|
||||
highest available
|
||||
"""
|
||||
|
||||
def __init__(self, database='apscheduler', table='jobs', client=None,
|
||||
pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
|
||||
super(RethinkDBJobStore, self).__init__()
|
||||
|
||||
if not database:
|
||||
raise ValueError('The "database" parameter must not be empty')
|
||||
if not table:
|
||||
raise ValueError('The "table" parameter must not be empty')
|
||||
|
||||
self.database = database
|
||||
self.table = table
|
||||
self.client = client
|
||||
self.pickle_protocol = pickle_protocol
|
||||
self.connect_args = connect_args
|
||||
self.conn = None
|
||||
|
||||
def start(self, scheduler, alias):
|
||||
super(RethinkDBJobStore, self).start(scheduler, alias)
|
||||
|
||||
if self.client:
|
||||
self.conn = maybe_ref(self.client)
|
||||
else:
|
||||
self.conn = r.connect(db=self.database, **self.connect_args)
|
||||
|
||||
if self.database not in r.db_list().run(self.conn):
|
||||
r.db_create(self.database).run(self.conn)
|
||||
|
||||
if self.table not in r.table_list().run(self.conn):
|
||||
r.table_create(self.table).run(self.conn)
|
||||
|
||||
if 'next_run_time' not in r.table(self.table).index_list().run(self.conn):
|
||||
r.table(self.table).index_create('next_run_time').run(self.conn)
|
||||
|
||||
self.table = r.db(self.database).table(self.table)
|
||||
|
||||
def lookup_job(self, job_id):
|
||||
results = list(self.table.get_all(job_id).pluck('job_state').run(self.conn))
|
||||
return self._reconstitute_job(results[0]['job_state']) if results else None
|
||||
|
||||
def get_due_jobs(self, now):
|
||||
return self._get_jobs(r.row['next_run_time'] <= datetime_to_utc_timestamp(now))
|
||||
|
||||
def get_next_run_time(self):
|
||||
results = list(
|
||||
self.table
|
||||
.filter(r.row['next_run_time'] != None) # flake8: noqa
|
||||
.order_by(r.asc('next_run_time'))
|
||||
.map(lambda x: x['next_run_time'])
|
||||
.limit(1)
|
||||
.run(self.conn)
|
||||
)
|
||||
return utc_timestamp_to_datetime(results[0]) if results else None
|
||||
|
||||
def get_all_jobs(self):
|
||||
jobs = self._get_jobs()
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def add_job(self, job):
|
||||
job_dict = {
|
||||
'id': job.id,
|
||||
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
||||
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
||||
}
|
||||
results = self.table.insert(job_dict).run(self.conn)
|
||||
if results['errors'] > 0:
|
||||
raise ConflictingIdError(job.id)
|
||||
|
||||
def update_job(self, job):
|
||||
changes = {
|
||||
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
||||
'job_state': r.binary(pickle.dumps(job.__getstate__(), self.pickle_protocol))
|
||||
}
|
||||
results = self.table.get_all(job.id).update(changes).run(self.conn)
|
||||
skipped = False in map(lambda x: results[x] == 0, results.keys())
|
||||
if results['skipped'] > 0 or results['errors'] > 0 or not skipped:
|
||||
raise JobLookupError(job.id)
|
||||
|
||||
def remove_job(self, job_id):
|
||||
results = self.table.get_all(job_id).delete().run(self.conn)
|
||||
if results['deleted'] + results['skipped'] != 1:
|
||||
raise JobLookupError(job_id)
|
||||
|
||||
def remove_all_jobs(self):
|
||||
self.table.delete().run(self.conn)
|
||||
|
||||
def shutdown(self):
|
||||
self.conn.close()
|
||||
|
||||
def _reconstitute_job(self, job_state):
|
||||
job_state = pickle.loads(job_state)
|
||||
job = Job.__new__(Job)
|
||||
job.__setstate__(job_state)
|
||||
job._scheduler = self._scheduler
|
||||
job._jobstore_alias = self._alias
|
||||
return job
|
||||
|
||||
def _get_jobs(self, predicate=None):
|
||||
jobs = []
|
||||
failed_job_ids = []
|
||||
query = (self.table.filter(r.row['next_run_time'] != None).filter(predicate) if
|
||||
predicate else self.table)
|
||||
query = query.order_by('next_run_time', 'id').pluck('id', 'job_state')
|
||||
|
||||
for document in query.run(self.conn):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(document['job_state']))
|
||||
except:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it', document['id'])
|
||||
failed_job_ids.append(document['id'])
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
if failed_job_ids:
|
||||
r.expr(failed_job_ids).for_each(
|
||||
lambda job_id: self.table.get_all(job_id).delete()).run(self.conn)
|
||||
|
||||
return jobs
|
||||
|
||||
def __repr__(self):
|
||||
connection = self.conn
|
||||
return '<%s (connection=%s)>' % (self.__class__.__name__, connection)
|
|
@ -10,29 +10,38 @@ except ImportError: # pragma: nocover
|
|||
import pickle
|
||||
|
||||
try:
|
||||
from sqlalchemy import create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select
|
||||
from sqlalchemy import (
|
||||
create_engine, Table, Column, MetaData, Unicode, Float, LargeBinary, select)
|
||||
from sqlalchemy.exc import IntegrityError
|
||||
from sqlalchemy.sql.expression import null
|
||||
except ImportError: # pragma: nocover
|
||||
raise ImportError('SQLAlchemyJobStore requires SQLAlchemy installed')
|
||||
|
||||
|
||||
class SQLAlchemyJobStore(BaseJobStore):
|
||||
"""
|
||||
Stores jobs in a database table using SQLAlchemy. The table will be created if it doesn't exist in the database.
|
||||
Stores jobs in a database table using SQLAlchemy.
|
||||
The table will be created if it doesn't exist in the database.
|
||||
|
||||
Plugin alias: ``sqlalchemy``
|
||||
|
||||
:param str url: connection string (see `SQLAlchemy documentation
|
||||
<http://docs.sqlalchemy.org/en/latest/core/engines.html?highlight=create_engine#database-urls>`_
|
||||
on this)
|
||||
:param engine: an SQLAlchemy Engine to use instead of creating a new one based on ``url``
|
||||
:param str url: connection string (see
|
||||
:ref:`SQLAlchemy documentation <sqlalchemy:database_urls>` on this)
|
||||
:param engine: an SQLAlchemy :class:`~sqlalchemy.engine.Engine` to use instead of creating a
|
||||
new one based on ``url``
|
||||
:param str tablename: name of the table to store jobs in
|
||||
:param metadata: a :class:`~sqlalchemy.MetaData` instance to use instead of creating a new one
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the highest available
|
||||
:param metadata: a :class:`~sqlalchemy.schema.MetaData` instance to use instead of creating a
|
||||
new one
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
|
||||
highest available
|
||||
:param str tableschema: name of the (existing) schema in the target database where the table
|
||||
should be
|
||||
:param dict engine_options: keyword arguments to :func:`~sqlalchemy.create_engine`
|
||||
(ignored if ``engine`` is given)
|
||||
"""
|
||||
|
||||
def __init__(self, url=None, engine=None, tablename='apscheduler_jobs', metadata=None,
|
||||
pickle_protocol=pickle.HIGHEST_PROTOCOL):
|
||||
pickle_protocol=pickle.HIGHEST_PROTOCOL, tableschema=None, engine_options=None):
|
||||
super(SQLAlchemyJobStore, self).__init__()
|
||||
self.pickle_protocol = pickle_protocol
|
||||
metadata = maybe_ref(metadata) or MetaData()
|
||||
|
@ -40,18 +49,22 @@ class SQLAlchemyJobStore(BaseJobStore):
|
|||
if engine:
|
||||
self.engine = maybe_ref(engine)
|
||||
elif url:
|
||||
self.engine = create_engine(url)
|
||||
self.engine = create_engine(url, **(engine_options or {}))
|
||||
else:
|
||||
raise ValueError('Need either "engine" or "url" defined')
|
||||
|
||||
# 191 = max key length in MySQL for InnoDB/utf8mb4 tables, 25 = precision that translates to an 8-byte float
|
||||
# 191 = max key length in MySQL for InnoDB/utf8mb4 tables,
|
||||
# 25 = precision that translates to an 8-byte float
|
||||
self.jobs_t = Table(
|
||||
tablename, metadata,
|
||||
Column('id', Unicode(191, _warn_on_bytestring=False), primary_key=True),
|
||||
Column('next_run_time', Float(25), index=True),
|
||||
Column('job_state', LargeBinary, nullable=False)
|
||||
Column('job_state', LargeBinary, nullable=False),
|
||||
schema=tableschema
|
||||
)
|
||||
|
||||
def start(self, scheduler, alias):
|
||||
super(SQLAlchemyJobStore, self).start(scheduler, alias)
|
||||
self.jobs_t.create(self.engine, True)
|
||||
|
||||
def lookup_job(self, job_id):
|
||||
|
@ -64,13 +77,16 @@ class SQLAlchemyJobStore(BaseJobStore):
|
|||
return self._get_jobs(self.jobs_t.c.next_run_time <= timestamp)
|
||||
|
||||
def get_next_run_time(self):
|
||||
selectable = select([self.jobs_t.c.next_run_time]).where(self.jobs_t.c.next_run_time != None).\
|
||||
selectable = select([self.jobs_t.c.next_run_time]).\
|
||||
where(self.jobs_t.c.next_run_time != null()).\
|
||||
order_by(self.jobs_t.c.next_run_time).limit(1)
|
||||
next_run_time = self.engine.execute(selectable).scalar()
|
||||
return utc_timestamp_to_datetime(next_run_time)
|
||||
|
||||
def get_all_jobs(self):
|
||||
return self._get_jobs()
|
||||
jobs = self._get_jobs()
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def add_job(self, job):
|
||||
insert = self.jobs_t.insert().values(**{
|
||||
|
@ -116,13 +132,14 @@ class SQLAlchemyJobStore(BaseJobStore):
|
|||
|
||||
def _get_jobs(self, *conditions):
|
||||
jobs = []
|
||||
selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).order_by(self.jobs_t.c.next_run_time)
|
||||
selectable = select([self.jobs_t.c.id, self.jobs_t.c.job_state]).\
|
||||
order_by(self.jobs_t.c.next_run_time)
|
||||
selectable = selectable.where(*conditions) if conditions else selectable
|
||||
failed_job_ids = set()
|
||||
for row in self.engine.execute(selectable):
|
||||
try:
|
||||
jobs.append(self._reconstitute_job(row.job_state))
|
||||
except:
|
||||
except BaseException:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it', row.id)
|
||||
failed_job_ids.add(row.id)
|
||||
|
||||
|
|
179
lib/apscheduler/jobstores/zookeeper.py
Normal file
179
lib/apscheduler/jobstores/zookeeper.py
Normal file
|
@ -0,0 +1,179 @@
|
|||
from __future__ import absolute_import
|
||||
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
from pytz import utc
|
||||
from kazoo.exceptions import NoNodeError, NodeExistsError
|
||||
|
||||
from apscheduler.jobstores.base import BaseJobStore, JobLookupError, ConflictingIdError
|
||||
from apscheduler.util import maybe_ref, datetime_to_utc_timestamp, utc_timestamp_to_datetime
|
||||
from apscheduler.job import Job
|
||||
|
||||
try:
|
||||
import cPickle as pickle
|
||||
except ImportError: # pragma: nocover
|
||||
import pickle
|
||||
|
||||
try:
|
||||
from kazoo.client import KazooClient
|
||||
except ImportError: # pragma: nocover
|
||||
raise ImportError('ZooKeeperJobStore requires Kazoo installed')
|
||||
|
||||
|
||||
class ZooKeeperJobStore(BaseJobStore):
|
||||
"""
|
||||
Stores jobs in a ZooKeeper tree. Any leftover keyword arguments are directly passed to
|
||||
kazoo's `KazooClient
|
||||
<http://kazoo.readthedocs.io/en/latest/api/client.html>`_.
|
||||
|
||||
Plugin alias: ``zookeeper``
|
||||
|
||||
:param str path: path to store jobs in
|
||||
:param client: a :class:`~kazoo.client.KazooClient` instance to use instead of
|
||||
providing connection arguments
|
||||
:param int pickle_protocol: pickle protocol level to use (for serialization), defaults to the
|
||||
highest available
|
||||
"""
|
||||
|
||||
def __init__(self, path='/apscheduler', client=None, close_connection_on_exit=False,
|
||||
pickle_protocol=pickle.HIGHEST_PROTOCOL, **connect_args):
|
||||
super(ZooKeeperJobStore, self).__init__()
|
||||
self.pickle_protocol = pickle_protocol
|
||||
self.close_connection_on_exit = close_connection_on_exit
|
||||
|
||||
if not path:
|
||||
raise ValueError('The "path" parameter must not be empty')
|
||||
|
||||
self.path = path
|
||||
|
||||
if client:
|
||||
self.client = maybe_ref(client)
|
||||
else:
|
||||
self.client = KazooClient(**connect_args)
|
||||
self._ensured_path = False
|
||||
|
||||
def _ensure_paths(self):
|
||||
if not self._ensured_path:
|
||||
self.client.ensure_path(self.path)
|
||||
self._ensured_path = True
|
||||
|
||||
def start(self, scheduler, alias):
|
||||
super(ZooKeeperJobStore, self).start(scheduler, alias)
|
||||
if not self.client.connected:
|
||||
self.client.start()
|
||||
|
||||
def lookup_job(self, job_id):
|
||||
self._ensure_paths()
|
||||
node_path = os.path.join(self.path, job_id)
|
||||
try:
|
||||
content, _ = self.client.get(node_path)
|
||||
doc = pickle.loads(content)
|
||||
job = self._reconstitute_job(doc['job_state'])
|
||||
return job
|
||||
except BaseException:
|
||||
return None
|
||||
|
||||
def get_due_jobs(self, now):
|
||||
timestamp = datetime_to_utc_timestamp(now)
|
||||
jobs = [job_def['job'] for job_def in self._get_jobs()
|
||||
if job_def['next_run_time'] is not None and job_def['next_run_time'] <= timestamp]
|
||||
return jobs
|
||||
|
||||
def get_next_run_time(self):
|
||||
next_runs = [job_def['next_run_time'] for job_def in self._get_jobs()
|
||||
if job_def['next_run_time'] is not None]
|
||||
return utc_timestamp_to_datetime(min(next_runs)) if len(next_runs) > 0 else None
|
||||
|
||||
def get_all_jobs(self):
|
||||
jobs = [job_def['job'] for job_def in self._get_jobs()]
|
||||
self._fix_paused_jobs_sorting(jobs)
|
||||
return jobs
|
||||
|
||||
def add_job(self, job):
|
||||
self._ensure_paths()
|
||||
node_path = os.path.join(self.path, str(job.id))
|
||||
value = {
|
||||
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
||||
'job_state': job.__getstate__()
|
||||
}
|
||||
data = pickle.dumps(value, self.pickle_protocol)
|
||||
try:
|
||||
self.client.create(node_path, value=data)
|
||||
except NodeExistsError:
|
||||
raise ConflictingIdError(job.id)
|
||||
|
||||
def update_job(self, job):
|
||||
self._ensure_paths()
|
||||
node_path = os.path.join(self.path, str(job.id))
|
||||
changes = {
|
||||
'next_run_time': datetime_to_utc_timestamp(job.next_run_time),
|
||||
'job_state': job.__getstate__()
|
||||
}
|
||||
data = pickle.dumps(changes, self.pickle_protocol)
|
||||
try:
|
||||
self.client.set(node_path, value=data)
|
||||
except NoNodeError:
|
||||
raise JobLookupError(job.id)
|
||||
|
||||
def remove_job(self, job_id):
|
||||
self._ensure_paths()
|
||||
node_path = os.path.join(self.path, str(job_id))
|
||||
try:
|
||||
self.client.delete(node_path)
|
||||
except NoNodeError:
|
||||
raise JobLookupError(job_id)
|
||||
|
||||
def remove_all_jobs(self):
|
||||
try:
|
||||
self.client.delete(self.path, recursive=True)
|
||||
except NoNodeError:
|
||||
pass
|
||||
self._ensured_path = False
|
||||
|
||||
def shutdown(self):
|
||||
if self.close_connection_on_exit:
|
||||
self.client.stop()
|
||||
self.client.close()
|
||||
|
||||
def _reconstitute_job(self, job_state):
|
||||
job_state = job_state
|
||||
job = Job.__new__(Job)
|
||||
job.__setstate__(job_state)
|
||||
job._scheduler = self._scheduler
|
||||
job._jobstore_alias = self._alias
|
||||
return job
|
||||
|
||||
def _get_jobs(self):
|
||||
self._ensure_paths()
|
||||
jobs = []
|
||||
failed_job_ids = []
|
||||
all_ids = self.client.get_children(self.path)
|
||||
for node_name in all_ids:
|
||||
try:
|
||||
node_path = os.path.join(self.path, node_name)
|
||||
content, _ = self.client.get(node_path)
|
||||
doc = pickle.loads(content)
|
||||
job_def = {
|
||||
'job_id': node_name,
|
||||
'next_run_time': doc['next_run_time'] if doc['next_run_time'] else None,
|
||||
'job_state': doc['job_state'],
|
||||
'job': self._reconstitute_job(doc['job_state']),
|
||||
'creation_time': _.ctime
|
||||
}
|
||||
jobs.append(job_def)
|
||||
except BaseException:
|
||||
self._logger.exception('Unable to restore job "%s" -- removing it' % node_name)
|
||||
failed_job_ids.append(node_name)
|
||||
|
||||
# Remove all the jobs we failed to restore
|
||||
if failed_job_ids:
|
||||
for failed_id in failed_job_ids:
|
||||
self.remove_job(failed_id)
|
||||
paused_sort_key = datetime(9999, 12, 31, tzinfo=utc)
|
||||
return sorted(jobs, key=lambda job_def: (job_def['job'].next_run_time or paused_sort_key,
|
||||
job_def['creation_time']))
|
||||
|
||||
def __repr__(self):
|
||||
self._logger.exception('<%s (client=%s)>' % (self.__class__.__name__, self.client))
|
||||
return '<%s (client=%s)>' % (self.__class__.__name__, self.client)
|
Loading…
Add table
Add a link
Reference in a new issue