# -*- coding: utf-8 -*-
"""
This is a general purpose scheduler. It does best effort scheduling and
execution of expired items in the order they are added. This also means that
there is no guarantee the tasks will be executed on time every time, in fact
they will always be late, even if just by milliseconds. If you need it to be
done on time, you schedule it early, but remember that it will still be best
effort.
The way this scheduler is supposed to be used is to add a scheduling queue,
then you can add tasks to the queue to either be put in a task queue ASAP, or
at or an absolute time in the future. The queue should be consumed by a worker
thread.
This module defines the following objects:
- :class:`stapled.scheduling.ScheduledTaskContext`
A context that wraps around any data you want to pass to the scheduler and
which will be added to the task queue when the schedule time expires.
- :class:`stapled.scheduling.SchedulerThread`
An object that is capable of scheduling and unscheduling tasks that you
can define with :class:`stapled.scheduling.ScheduledTaskContext`.
"""
import threading
import logging
import datetime
from queue import Queue
import time
from collections import defaultdict
LOG = logging.getLogger(__name__)
[docs]class ScheduledTaskContext(object):
"""
A context for scheduled tasks, this context can be updated with an
exception count for the last exception, so it can be re-scheduled if it is
the appropriate action.
"""
# pylint: disable=too-few-public-methods
[docs] def __init__(self, task_name, subject, sched_time=None, **attributes):
"""
Initialise a :class:`~stapled.scheduling.ScheduledTaskContext` with a
task name, subject and optional scheduled time. Any remaining keyword
arguments are set as attributes of the task context.
:param str task: A task corresponding to an existing queue in the
target scheduler.
:param datetime.datetime|int sched_time: Absolute time
(datetime.datetime object) or relative time in seconds (int) to
schedule the task.
:param obj subject: A subject for the context instance this can be
whatever object you want to pass along to the worker.
:param kwargs attributes: Any additional data you want to assign to
the context, avoid using names already defined in the context:
``scheduler``, ``task``, ``subject``, ``sched_time``,
``reschedule``.
"""
#: This attribute will be set automatically when the context is passed
#: to a scheduler.
self.scheduler = None
self.task_name = task_name
self.subject = subject
self.sched_time = sched_time
for attr, value in attributes.items():
if hasattr(ScheduledTaskContext, attr):
raise AttributeError(
"Can't set \"{}\" it's a reserved attribute name.".format(
attr)
)
self.__setattr__(attr, value)
[docs] def reschedule(self, sched_time=None):
"""
Reschedule this context itself.
:param datetime.datetime sched_time: When should this context be added
back to the task queue
"""
try:
self.sched_time = sched_time
self.scheduler.add_task(self)
except AttributeError:
raise AttributeError(
"This context was never added to a queue before.")
def __repr__(self):
return "<ScheduledTaskContext {}: {}>".format(
self.task_name, self.subject)
[docs]class SchedulerThread(threading.Thread):
"""
This object can be used to schedule tasks for contexts.
The context should be a :class:`~scheduler.ScheduledTaskContext` or an
extension of it.. When the scheduled time has *passed*, the context will be
added back to the internal task queue(s), where it can be consumed by a
worker thread.
When a task is scheduled you can choose to have it added to the task queue
ASAP or at a specified absolute or relative point in time. If you add it
with an absolute time in the past, or a negative relative number, it will
be added to the task queue the first time the scheduler checks expired
tasks schedule times. If you want to run a task ASAP, you probably don't
that, you should pass ``sched_time=None`` instead, it will bypass the
scheduling mechanism and place your task directly into the worker queue.
"""
[docs] def __init__(self, *args, **kwargs):
"""
Initialise the thread's arguments and its parent
:class:`threading.Thread`.
:kwarg iterable queues: A list, tuple or any iterable that returns
strings that should be the names of queues.
:kwarg int|float sleep: The sleep time in seconds between checking the
expired items in the queue (default=1)
:raises KeyError: If the queue name is already taken (only when queues
kwarg is used).
"""
self.stop = False
self._queues = {}
#: The schedule contains items indexed by time.
self.schedule = defaultdict(lambda: [])
#: Keeping the tasks in reverse order helps for faster unscheduling.
self.scheduled_by_context = {}
#: Keeping the tasks per queue name helps faster queue deletion.
self.scheduled_by_queue = {}
#: To allow removing by subject we keep the scheduled tasks by subject.
self.scheduled_by_subject = defaultdict(lambda: [])
queues = kwargs.pop('queues', None)
if queues:
for queue_ in queues:
self.add_queue(queue_)
self.sleep = kwargs.pop('sleep', 1)
super(SchedulerThread, self).__init__(*args, **kwargs)
[docs] def add_queue(self, name, max_size=0):
"""
Add a scheduled queue to the scheduler.
:param str name: A unique name for the queue.
:param int max_size: Maximum queue depth, [default=0 (unlimited)].
:raises KeyError: If the queue name is already taken.
"""
if name in self._queues:
raise KeyError("A queue with name %s already exists.", name)
self._queues[name] = Queue(max_size)
self.scheduled_by_queue[name] = []
[docs] def remove_queue(self, name):
"""
Remove a scheduled queue from the scheduler.
:param str name: The name of the existing queue.
:raises KeyError: If the queue doesn't exist.
"""
try:
for ctx in self.scheduled_by_queue[name]:
sched_time = self.scheduled_by_context.pop(ctx)
self.schedule[sched_time].remove(ctx)
del self.scheduled_by_subject[ctx.subject]
del self.scheduled_by_queue[name]
del self._queues[name]
except KeyError:
raise KeyError("A queue with name %s doesn't exist.", name)
[docs] def add_task(self, ctx):
"""
Add a :class:`~scheduler.ScheduledTaskContext` to be added to the task
queue either ASAP, or at a specific time.
If the context is not unique, the scheduled task will be cancelled
before scheduling the new task.
:param ScheduledTaskContext ctx: A context containing data for a
worker thread.
:raises queue.Queue.Full: If the underlying task queue is full.
:raises TypeError: If the passed context is not a
:class:`~scheduler.ScheduledTaskContext`
:raises KeyError: If the task queue doesn't exist.
"""
if not isinstance(ctx, ScheduledTaskContext):
raise TypeError(
"Passed context is not an instance of ScheduledTaskContext")
if ctx.task_name not in self._queues:
raise KeyError(
"Queue with task name {} doesn't exist.", ctx.task_name)
ctx.scheduler = self
if not ctx.sched_time:
# Run scheduled tasks ASAP by adding it to the queue.
self._queues[ctx.task_name].put(ctx)
return
if isinstance(ctx.sched_time, int):
# Convert relative time in seconds to absolute time
ctx.sched_time = datetime.datetime.now() + \
datetime.timedelta(seconds=ctx.sched_time)
if ctx in self.scheduled_by_context:
LOG.warning("Task %s was already scheduled, unscheduling.", ctx)
self.cancel_task(ctx)
# Run scheduled tasks after ctx.sched_time seconds.
self.scheduled_by_context[ctx] = ctx.sched_time
self.scheduled_by_queue[ctx.task_name].append(ctx)
self.schedule[ctx.sched_time].append(ctx)
self.scheduled_by_subject[ctx.subject].append(ctx)
LOG.debug(
"Scheduled %s at %s",
ctx, ctx.sched_time.strftime('%Y-%m-%d %H:%M:%S'))
[docs] def cancel_task(self, ctx):
"""
Remove a task from the scheduler.
.. Note:: Tasks that were already queued for a worker to process can't
be canceled anymore.
:param ScheduledTaskContext ctx: A context containing data for a
worker thread.
:return bool: True for successfully cancelled task or False.
"""
try:
# Find out when it was scheduled
sched_time = self.scheduled_by_context.pop(ctx)
# There can be more than one task scheduled in the same time
# slot so we need to filter out any value that is not our target
# and leave it
self.schedule[sched_time].remove(ctx)
self.scheduled_by_queue[ctx.task_name].remove(ctx)
self.scheduled_by_subject[ctx.subject].remove(ctx)
return True
except KeyError:
LOG.warning("Can't unschedule, %s wasn't scheduled.", ctx)
return False
[docs] def get_task(self, task_name, blocking=True, timeout=None):
"""
Get a task context from the task queue ``task``.
:param str task_name: Task name that refers to an existsing scheduler
queue.
:param bool blocking: Wait until there is something to return from the
queue.
:raises Queue.Empty: If the underlying task queue is empty and
blocking is False or the timout expires.
:raises KeyError: If the task queue does not exist.
"""
if task_name not in self._queues:
raise KeyError("Queue with task name {} doesn't exist.", task_name)
return self._queues[task_name].get(blocking, timeout)
[docs] def task_done(self, task_name):
"""
Mark a task done on a queue, this up the queue's counter of completed
tasks.
:param str task_name: The task queue name.
:raises KeyError: If the task queue does not exist.
"""
if task_name not in self._queues:
raise KeyError("Queue with task name {} doesn't exist.", task_name)
return self._queues[task_name].task_done()
[docs] def run(self):
"""
Start the scheduler thread.
"""
LOG.info("Started a scheduler thread.")
while not self.stop:
self._run()
time.sleep(self.sleep)
LOG.debug("Goodbye cruel world..")
[docs] def run_all(self):
"""
Run all tasks currently queued regardless schedule time.
"""
self._run(True)
[docs] def _run(self, all_tasks=False):
"""
Runs all scheduled tasks that have a scheduled time < now.
"""
now = datetime.datetime.now()
# Take a copy of all sched_time keys
if all_tasks:
todo = list(self.schedule)
else:
# Only scheduled before or at now, default
todo = [x for x in self.schedule if x <= now]
for sched_time in todo:
items = self.schedule.pop(sched_time)
for ctx in items:
LOG.debug("Adding %s to the %s queue.", ctx, ctx.task_name)
# Remove from reverse indexed dict
del self.scheduled_by_context[ctx]
self.scheduled_by_queue[ctx.task_name].remove(ctx)
self.scheduled_by_subject[ctx.subject].remove(ctx)
self._queues[ctx.task_name].put(ctx)
late = datetime.datetime.now() - sched_time
if late.seconds < 1:
late = ''
elif 1 < late.seconds < 59: # between 1 and 59 seconds
late = " {} seconds late".format(late.seconds)
else:
late = " {} late".format(late)
LOG.debug(
"Queued %s at %s%s",
ctx, now.strftime('%Y-%m-%d %H:%M:%S'), late)
[docs] def cancel_by_subject(self, subject):
"""
Cancel scheduled tasks by the task's context's subject.
This comes down to: delete anything from the scheduler that relates to
my object `X`.
:param obj subject: The object you want all scheduled tasks cancelled
for.
"""
ctxs = self.scheduled_by_subject[subject]
for ctx in ctxs:
self.cancel_task(ctx)