# -*- coding: utf-8 -*-
"""
This module locates certificate files in the supplied paths and parses
them. It then keeps track of the following:
- If cert is found for the first time (thus also when the daemon is started),
the cert is added to the :attr:`stapled.core.certfinder.CertFinder.scheduler`
so the :class:`~stapled.core.certparser.CertParserThread` can parse the
certificate. The file modification time is recorded so file changes can be
detected.
- If a cert is found a second time, the modification time is compared to the
recorded modification time. If it differs, if it differs, the file is added
to the scheduler for parsing again, any scheduled actions for the old file
are cancelled.
- When certificates are deleted from the paths, the entries are removed
from the cache in :attr:`stapled.core.daemon.run.models`. Any scheduled
actions for deleted files are cancelled.
The cache of parsed files is volatile so every time the process is killed
files need to be indexed again (thus files are considered "new").
"""
import threading
import time
import logging
import fnmatch
import os
import errno
import stapled
from stapled.core.excepthandler import stapled_except_handle
from stapled.core.taskcontext import StapleTaskContext
from stapled.core.certmodel import CertModel
from stapled.util.cache import cache
LOG = logging.getLogger(__name__)
[docs]class CertFinderThread(threading.Thread):
"""
This searches paths for certificate files.
When found, models are created for the certificate files, which are wrapped
in a :class:`stapled.core.taskcontext.StapleTaskContext` which are then
scheduled to be processed by the
:class:`stapled.core.certparser.CertParserThread` ASAP.
Pass ``refresh_interval=None`` if you want to run it only once (e.g. for
testing)
"""
# pylint: disable=too-many-instance-attributes
[docs] def __init__(self, *args, **kwargs):
"""
Initialise the thread with its parent :class:`threading.Thread` and its
arguments.
:kwarg dict models: A dict to maintain a model cache **(required)**.
:kwarg iter cert_paths: The paths to index **(required)**.
:kwarg stapled.scheduling.SchedulerThread scheduler: The scheduler
object where we add new parse tasks to. **(required)**.
:kwarg int refresh_interval: The minimum amount of time (s)
between search runs, defaults to 10 seconds. Set to None to run
only once **(optional)**.
:kwarg array file_extensions: An array containing the file extensions
of file types to check for certificate content **(optional)**.
"""
self.stop = False
self.models = kwargs.pop('models', None)
self.cert_paths = kwargs.pop('cert_paths', None)
self.scheduler = kwargs.pop('scheduler', None)
self.refresh_interval = kwargs.pop(
'refresh_interval', stapled.DEFAULT_REFRESH_INTERVAL
)
self.file_extensions = kwargs.pop(
'file_extensions', stapled.FILE_EXTENSIONS_DEFAULT
)
self.last_refresh = None
self.ignore = kwargs.pop('ignore', []) or []
self.recursive = kwargs.pop('recursive', False)
assert self.models is not None, \
"You need to pass a dict to hold the certificate model cache."
assert self.cert_paths is not None, \
"At least one path should be passed for indexing."
assert self.scheduler is not None, \
"Please pass a scheduler to get tasks from and add tasks to."
super(CertFinderThread, self).__init__(*args, **kwargs)
[docs] def run(self):
"""Start the certificate finder thread."""
LOG.info("Scanning paths: '%s'", "', '".join(self.cert_paths))
while not self.stop:
# Catch any exceptions within this context to protect the thread.
with stapled_except_handle():
self.refresh()
if self.refresh_interval is None:
# Stop refreshing if it is not wanted.
break
# Schedule the next refresh run..
since_last = time.time() - self.last_refresh
# Check if the last refresh took longer than the interval..
if since_last > self.refresh_interval:
# It did so start right now..
LOG.info(
"Starting a new refresh immediately because the last "
"refresh took %0.3f seconds while the minimum "
"interval is %d seconds.",
since_last,
self.refresh_interval
)
else:
# Wait the remaining time before refreshing again..
LOG.info(
"Scheduling a new refresh in %0.2f seconds because "
"the last refresh took %0.2f seconds while the "
"minimum interval is %d seconds.",
self.refresh_interval - since_last,
since_last,
self.refresh_interval
)
sleep_time = self.refresh_interval - since_last
while sleep_time > 0:
if self.stop:
break
time.sleep(1)
sleep_time = sleep_time - 1
LOG.debug("Goodbye cruel world..")
[docs] def refresh(self):
"""
Wrap up the internal :meth:`CertFinder._update_cached_certs()` and
:meth:`CertFinder._find_new_certs()` functions.
.. Note:: This method is automatically called by
:meth:`CertFinder.run()`
"""
self.last_refresh = time.time()
LOG.info("Starting a refresh run.")
self._update_cached_certs()
self._find_new_certs(self.cert_paths)
[docs] def _find_new_certs(self, paths, force_cert_path=None):
"""
Locate new files, schedule them for parsing.
:param list|tuple paths: Paths to scan for certificates.
:param str|Nonetype force_cert_path: Parent path as specified in the
CLI arguments. Necessary to link certificates found in `paths` to
any configured sockets.
:raises stapled.core.exceptions.CertFileAccessError: When the
certificate file can't be accessed.
"""
for path in paths:
if force_cert_path:
# Keep this value so we know in which directory it was found.
# Only keep the highest level, equal to what was supplied as
# an argument or in config.
cert_path = force_cert_path
else:
cert_path = path
try:
LOG.debug("Scanning path: %s", path)
dirs = []
try:
dirs = os.listdir(path)
except (OSError, IOError) as exc:
# If a path is actually a file we can still use it..
if exc.errno == errno.ENOTDIR and os.path.isfile(path):
LOG.debug("%s may be a single file", path)
# This will allow us to use our usual iteration.
dirs = [os.path.basename(path)]
path = os.path.dirname(path)
else:
raise exc
for entry in dirs:
entry = os.path.join(path, entry)
if os.path.isdir(entry):
if self.recursive:
LOG.debug("Recursing path %s", entry)
self._find_new_certs([entry], cert_path)
continue
ext = os.path.splitext(entry)[1].lstrip(".")
if ext not in self.file_extensions:
continue
if entry in self.models:
continue
if self.check_ignore(entry):
LOG.debug(
"Ignoring file %s, because it's on the ignore "
"list.",
entry
)
continue
model = CertModel(entry, cert_path=cert_path)
# Remember the model so we can compare the file later to
# see if it changed.
self.models[entry] = model
# Schedule the certificate for parsing.
context = StapleTaskContext(
task_name="parse",
model=model,
sched_time=None
)
self.scheduler.add_task(context)
except (IOError, OSError) as exc:
# If the directory is unreadable this gets printed at every
# refresh until the directory is readable. We catch this here
# so any readable directory can still be scanned.
LOG.critical(
"Can't read path: %s, reason: %s.",
path, exc
)
[docs] def _del_model(self, filename):
"""
Delete model from :attr:`stapled.core.daemon.run.models`.
This is done in a thread-safe manner, if another thread deleted it,
we should ignore the KeyError making this function omnipotent.
:param str filename: The filename of the model to forget about.
"""
try:
del self.models[filename]
except KeyError:
pass
[docs] def _update_cached_certs(self):
"""
Check for deleted or changed certificate files.
Loop through the list of files that were already found and check
whether they were deleted or changed.
If a file was modified since it was last seen, the file is added to the
scheduler to get the new certificate data parsed.
Deleted files are removed from the model cache in
:attr:`stapled.core.daemon.run.models`. Any scheduled tasks for the
model's task context are cancelled.
:raises stapled.core.exceptions.CertFileAccessError: When the
certificate file can't be accessed.
"""
deleted = []
changed = []
for filename, model in self.models.items():
if not os.path.exists(filename):
deleted.append(filename)
elif os.path.getmtime(filename) > model.modtime:
changed.append(filename)
# Purge certs that no longer exist in the cert dirs
for filename in deleted:
# Cancel any scheduled tasks for the model.
self.scheduler.cancel_by_subject(self.models[filename])
# Remove the model from cache
self._del_model(filename)
LOG.info(
"File %s was deleted, removing it from the cache.", filename)
# Re-add files that have changed, we will make a new model so the model
# is an accurate representation of what is in the cerificate file on
# disk, this is just to prevent any stale data being used in the
# process. Making the new model and scheduling a parse will make go
# through all the steps to get the certificate stapled ASAP again.
for filename in changed:
# Cancel any scheduled tasks for the model.
self.scheduler.cancel_by_subject(self.models[filename])
# Before deleting the model from cache take relevant information
# that will be lost
cert_path = self.models[filename].cert_path
# Remove the model from cache.
self._del_model(filename)
# Make a new model.
LOG.info("File %s changed, parsing it again.", filename)
new_model = CertModel(filename, cert_path)
context = StapleTaskContext(
task_name="parse", model=new_model, sched_time=None)
self.scheduler.add_task(context)
[docs] @cache(10000)
def check_ignore(self, path):
"""
Check if a file path matches any pattern in the ignore list.
:param str path: Path to match a pattern in ``self.ignore``.
"""
for pattern in self.ignore:
# Strip spaces, check if length still greater than 0
pattern = pattern.strip()
if not pattern:
continue
# If pattern starts with / it is absolute, do nothing, if not, add
# ``**`` to make fnmatch match any parent directory.
if pattern[0] != '/':
pattern = "**{}".format(pattern)
if fnmatch.fnmatch(path, pattern):
return True
return False