Source code for jobslib.liveness.consul
"""
Module :mod:`jobslib.liveness.consul` provides :class:`ConsulLiveness`
writer.
"""
import json
import logging
import os
import retrying
from consul import Consul
from objectvalidator import option
from . import BaseLiveness
from ..config import ConfigGroup, RetryConfigMixin
__all__ = ['ConsulLiveness']
logger = logging.getLogger(__name__)
[docs]class ConsulLiveness(BaseLiveness):
"""
Consul liveness implementation. Provides exporting informations about
health state into Consul's key/value storage.
For use of :class:`ConsulLiveness` write into :mod:`settings`:
.. code-block:: python
LIVENESS = {
'backend': 'jobslib.liveness.consul.ConsulLiveness',
'options': {
'host': 'hostname',
'port': 8500,
'timeout': 1.0,
'key': 'jobs/example/liveness',
'retry_max_attempts': 10,
'retry_wait_multiplier': 50,
},
}
Or use
:envvar:`JOBSLIB_LIVENESS_CONSUL_HOST`,
:envvar:`JOBSLIB_LIVENESS_CONSUL_PORT`,
:envvar:`JOBSLIB_LIVENESS_CONSUL_TIMEOUT`,
:envvar:`JOBSLIB_LIVENESS_CONSUL_KEY`,
:envvar:`JOBSLIB_LIVENESS_CONSUL_RETRY_MAX_ATTEMPTS` and
:envvar:`JOBSLIB_LIVENESS_CONSUL_RETRY_WAIT_MULTIPLIER`
environment variables.
"""
[docs] class OptionsConfig(RetryConfigMixin, ConfigGroup):
"""
Consul liveness options.
"""
retry_env_prefix = 'JOBSLIB_LIVENESS_CONSUL_'
[docs] @option(required=True, attrtype=str)
def scheme(self):
"""
URI scheme, in current implementation always ``http``.
"""
return 'http'
[docs] @option(required=True, attrtype=str)
def host(self):
"""
IP address or hostname of the Consul server.
"""
host = os.environ.get('JOBSLIB_LIVENESS_CONSUL_HOST')
if host:
return host
return self._settings.get('host', '127.0.0.1')
[docs] @option(required=True, attrtype=int)
def port(self):
"""
Port where the Consul server listening on.
"""
port = os.environ.get('JOBSLIB_LIVENESS_CONSUL_PORT')
if port:
return int(port)
return self._settings.get('port', 8500)
[docs] @option(required=True, attrtype=float)
def timeout(self):
"""
Timeout in seconds for connect/read/write operation.
"""
timeout = os.environ.get('JOBSLIB_LIVENESS_CONSUL_TIMEOUT')
if timeout:
return float(timeout)
timeout = self._settings.get('timeout', 5.0)
if isinstance(timeout, int):
timeout = float(timeout)
return timeout
[docs] @option(required=True, attrtype=str)
def key(self):
"""
Key under which the health state is stored.
"""
key = os.environ.get('JOBSLIB_LIVENESS_CONSUL_KEY')
if key:
return key
return self._settings['key']
def __init__(self, context, options):
super().__init__(context, options)
self._consul = Consul(
scheme=self.options.scheme,
host=self.options.host,
port=self.options.port,
timeout=self.options.timeout,
)
def write(self):
@retrying.retry(
stop_max_attempt_number=self.options.retry_max_attempts,
wait_exponential_multiplier=self.options.retry_wait_multiplier)
def _write(data):
return self._consul.kv.put(self.options.key, data)
try:
state = self.get_state()
data = json.dumps(state)
if not _write(data):
logger.error("Can't write liveness state")
except Exception:
logger.exception("Can't write liveness state")
def read(self):
@retrying.retry(
stop_max_attempt_number=self.options.retry_max_attempts,
wait_exponential_multiplier=self.options.retry_wait_multiplier)
def _read():
return self._consul.kv.get(self.options.key)[1]
try:
data = _read()
if data is None:
raise KeyError(self.options.key)
record = json.loads(data['Value'])
except Exception:
logger.exception("Can't read liveness state")
raise
return record