Source code for stapled.core.stapleadder

# -*- coding: utf-8 -*-
"""
Module for adding OCSP Staples to a running HAProxy instance.
"""
import threading
import logging
import socket
import errno
import queue
from io import StringIO
from stapled.core.excepthandler import stapled_except_handle
import stapled.core.exceptions

try:
    _ = BrokenPipeError
except NameError:
    BrokenPipeError = socket.error

LOG = logging.getLogger(__name__)
SOCKET_BUFFER_SIZE = 1024
SOCKET_TIMEOUT = 86400


[docs]class StapleAdder(threading.Thread): """ Add OCSP staples to a running HAProxy instance by sending it over a socket. It runs a thread that keeps connections to sockets open for each of the supplied haproxy sockets. Code from `collectd haproxy connection`_ under the MIT license, was used for inspiration. Tasks are taken from the :class:`stapled.scheduling.SchedulerThread`, as soon as a task context is received, an OCSP response is read from the model within it, it is added to a HAProxy socket found in self.socks[<certificate directory>]. .. _collectd haproxy connection: https://github.com/wglass/collectd-haproxy/blob/master/collectd_haproxy/ connection.py """ #: The name of this task in the scheduler TASK_NAME = 'proxy-add' #: The haproxy socket command to add OCSP staples. Use string.format to add #: the base64 encoded OCSP staple OCSP_ADD = 'set ssl ocsp-response {}' #: Predefines commands to send to sockets just after opening them. CONNECT_COMMANDS = [ "prompt", "set timeout cli {}".format(SOCKET_TIMEOUT) ]
[docs] def __init__(self, *args, **kwargs): """ Initialise the thread and its parent :class:`threading.Thread`. :kwarg dict haproxy_socket_mapping: A mapping from a directory (typically the directory containing TLS certificates) to a HAProxy socket that serves certificates from that directory. These sockets are used to communicate new OCSP staples to HAProxy, so it does not have to be restarted. :kwarg stapled.scheduling.SchedulerThread scheduler: The scheduler object where we can get "haproxy-adder" tasks from **(required)**. """ self.stop = False LOG.debug("Starting StapleAdder thread") self.scheduler = kwargs.pop('scheduler', None) self.haproxy_socket_mapping = kwargs.pop( 'haproxy_socket_mapping', None ) assert self.scheduler is not None, \ "Please pass a scheduler to get and add proxy-add tasks." assert self.haproxy_socket_mapping is not None, \ "The StapleAdder needs a haproxy_socket_mapping dict" self.socks = {} for paths in self.haproxy_socket_mapping.values(): for path in paths: with stapled_except_handle(): self._open_socket(path) super(StapleAdder, self).__init__(*args, **kwargs)
[docs] def _re_open_socket(self, path): """ Re-open socket located at path, and return the socket. Closes open sockets and wraps appropriate logging around the ``_open_socket`` method. :param str path: A valid HAProxy socket path. :return socket.socket: An open socket. :raises :exc:stapled.core.exceptions.SocketError: when the socket can not be opened. """ # Try to re-open the socket. If that doesn't work, that # will raise a :exc:`~stapled.core.exceptions.SocketError` LOG.info("Re-opening socket %s", path) try: sock = self.socks[path] sock.close() except (KeyError, UnboundLocalError): # Socket not openend, no need to close anything. pass # Open socket again.. return self._open_socket(path)
[docs] def _open_socket(self, path): """ Open socket located at path, and return the socket. Subsequently it asks for a prompt to keep the socket connection open, so several commands can be sent without having to close and re-open the socket. :param str path: A valid HAProxy socket path. :return socket.socket: An open socket. :raises :exc:stapled.core.exceptions.SocketError: when the socket can not be opened. """ sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) try: sock.connect(path) result = [] for command in self.CONNECT_COMMANDS: result.extend(self._send(sock, command)) # Results (index 1) come per path (index 0), we need only results result = [res[1] for res in result] # Indented and on separate lines or None if an empty list result = "\n\t{}".format("\n\t".join(result)) if result else "None" LOG.debug("Opened prompt with result: %s", result) self.socks[path] = sock return sock except (OSError, IOError) as exc: raise stapled.core.exceptions.SocketError( "Could not initialize StapleAdder with socket {}: {}".format( path, exc ) )
[docs] def __del__(self): """Close the sockets on exit.""" for sock in self.socks.values(): sock.close()
[docs] def run(self): """ Send any commands that enter the command queue. This is the stapleadder thread's main loop. """ LOG.info("Started an OCSP adder thread.") while not self.stop: try: context = self.scheduler.get_task(self.TASK_NAME, timeout=0.25) model = context.model LOG.debug("Sending staple for cert:'%s'", model) # Open the exception handler context to run tasks likely to # fail with stapled_except_handle(context): self.add_staple(model) self.scheduler.task_done(self.TASK_NAME) except queue.Empty: pass LOG.debug("Goodbye cruel world..")
[docs] def add_staple(self, model): """ Create and send base64 encoded OCSP staple to the HAProxy. :param model: An object that has a binary string `ocsp_staple` in it and a filename `filename`. """ command = self.OCSP_ADD.format(model.ocsp_staple.base64) LOG.debug("Setting OCSP staple with command '%s'", command) paths = self.haproxy_socket_mapping[model.cert_path] if not paths: LOG.debug("No socket set for %s", model.filename) return responses = self.send(paths, command) for path, response in responses: if response != 'OCSP Response updated!': raise stapled.core.exceptions.StapleAdderBadResponse( "Bad HAProxy response: '{}' from socket {}".format( response, path ) )
[docs] @staticmethod def _send(sock, command): """ Send the command through the ``socket`` and handle response. :param list sock: An already opened socket. :param str command: String with the HAProxy command. For a list of possible commands, see the `haproxy documentation`_ :return list: List of tuples containing path and response from HAProxy. :raises IOError if an error occurs and it's not errno.EAGAIN or errno.EINTR .. _haproxy documentation: http://haproxy.tech-notes.net/9-2-unix-socket-commands/ """ sock.sendall((command + "\n").encode()) buff = StringIO() # Get new response. while True: try: chunk = sock.recv(SOCKET_BUFFER_SIZE) if chunk: decoded_chunk = chunk.decode('ascii') buff.write(decoded_chunk) # TODO: Find out what happens if several threads # are talking to HAProxy on this socket if '> ' in decoded_chunk: break else: break except IOError as err: if err.errno not in (errno.EAGAIN, errno.EINTR): raise # Strip *all* \n, > and space characters from the end response = buff.getvalue().strip('\n> ') buff.close() return response
[docs] def send(self, paths, command): """ Send the command through the sockets at ``paths``. :param str|list paths: The path(s) to the socket(s) which should already be open. :param str command: String with the HAProxy command. For a list of possible commands, see the `haproxy documentation`_ :return list: List of tuples containing path and response from HAProxy. :raises IOError if an error occurs and it's not errno.EAGAIN or errno.EINTR .. _haproxy documentation: http://haproxy.tech-notes.net/9-2-unix-socket-commands/ """ # Empty buffer first, it's possible that other commands have been fired # to the same socket, we don't want the response to those commands in # our response string. # FIXME: This would be nice, but is tricky because the socket seems to # close if the recv call times out. Otherwise the socket stays open but # the recv call is blocking... # If this problem occurs, the easiest way is probably to open a socket # each time we want to communicate... # while True: # try: # chunk = self.socks[path].recv(SOCKET_BUFFER_SIZE) # if not chunk: # break # except IOError as err: # if err.errno not in (errno.EAGAIN, errno.EINTR): # raise # else: # break with stapled_except_handle(): responses = [] if not isinstance(paths, (list, tuple)): paths = [paths] for path in paths: try: sock = self.socks[path] response = self._send(sock, "{}\n".format(command)) except (BrokenPipeError, KeyError): sock = self._re_open_socket(path) response = self._send(sock, "{}\n".format(command)) LOG.debug("Received HAProxy response '%s'", response) responses.append((path, response)) return responses