Source code for jobslib.oneinstance.consul

"""
Module :mod:`jobslib.oneinstance.consul` provides :class:`ConsulLock`
lock. It is based on HashiCorp Consul, so lock is distributed among
datacenters.
"""

import collections.abc
import json
import logging
import signal
import os

import retrying

from consul import Consul
from objectvalidator import option

from . import BaseLock, OneInstanceWatchdogError
from ..config import ConfigGroup, RetryConfigMixin
from ..time import get_current_time, to_local, to_utc

__all__ = ['ConsulLock']

logger = logging.getLogger(__name__)

ONE_DAY_SECONDS = 60 * 60 * 24


[docs]class ConsulLock(BaseLock): """ Consul lock implementation, provides locking among datacenters. When the lock expires due to TTL, :exc:`OneInstanceWatchdogError` is raised. It is possible to extend the lock using :meth:`refresh`. Lock is not extended immediately, but request for extending is made and lock will be extended asynchronously when **SIGALRM** is received. So use conservative TTL and periodically extend the lock. .. warning:: :class:`ConsulLock` uses :mod:`signal` and :data:`signal.SIGALRM` for TTL mechanism, so don't use :data:`~!signal.SIGALRM` in your task. And don't use multiple instances of the :class:`ConsulLock` at the same time, becase :data:`~!signal.SIGALRM` can't be shared among multiple instances. For using the :class:`ConsulLock` configure backend in :mod:`settings`: .. code-block:: python ONE_INSTANCE = { 'backend': 'jobslib.oneinstance.consul.ConsulLock', 'options': { 'host': 'hostname', 'port': 8500, 'timeout': 1.0, 'key': 'jobs/example/lock', 'ttl': 60.0, 'lock_delay': 15.0, 'retry_max_attempts': 10, 'retry_wait_multiplier': 50, }, } Or use :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_HOST`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_PORT`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_TIMEOUT`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_KEY`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_TTL`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_MAX_ATTEMPTS` and :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_WAIT_MULTIPLIER` environment variables. """
[docs] class OptionsConfig(RetryConfigMixin, ConfigGroup): """ Consul lock options. """ retry_env_prefix = 'JOBSLIB_ONE_INSTANCE_CONSUL_'
[docs] @option(required=True, attrtype=str) def scheme(self): """ URI scheme, in current implementation always ``http``. """ return 'http'
[docs] @option(required=True, attrtype=str) def host(self): """ IP address or hostname of the Consul server. """ host = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_HOST') if host: return host return self._settings.get('host', '127.0.0.1')
[docs] @option(required=True, attrtype=int) def port(self): """ Port where the Consul server listening on. """ port = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_PORT') if port: return int(port) return self._settings.get('port', 8500)
[docs] @option(required=True, attrtype=float) def timeout(self): """ Timeout in seconds for connect/read/write operation. """ timeout = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_TIMEOUT') if timeout: return float(timeout) timeout = self._settings.get('timeout', 5.0) if isinstance(timeout, int): timeout = float(timeout) return timeout
[docs] @option(required=True, attrtype=str) def key(self): """ Key under which the lock is stored. """ key = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_KEY') if key: return key # env for backward compatibility key = os.environ.get('JOBSLIB_ONE_INSTANCE_OPTIONS_KEY') if key: return key return self._settings['key']
[docs] @option(required=True, attrtype=int) def ttl(self): """ Maximum lock lifespan in seconds, must be between 10 seconds and one day. If value is omitted, default is one day (maximum for Consul). """ if 'JOBSLIB_ONE_INSTANCE_CONSUL_TTL' in os.environ: ttl = int(os.environ['JOBSLIB_ONE_INSTANCE_CONSUL_TTL']) elif 'JOBSLIB_ONE_INSTANCE_OPTIONS_TTL' in os.environ: # env for backward compatibility ttl = int(os.environ['JOBSLIB_ONE_INSTANCE_OPTIONS_TTL']) else: ttl = self._settings.get('ttl', ONE_DAY_SECONDS) if ttl < 10 or ttl > ONE_DAY_SECONDS: raise ValueError( 'TTL must be between 10 and {} seconds'.format( ONE_DAY_SECONDS)) return ttl
[docs] @option(required=True, attrtype=int) def lock_delay(self): """ When sessions invalidation request is received, wait *lock_delay* seconds before session is truly invalidated. Value must be between 0 and 60 seconds, default is 1. """ if 'JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY' in os.environ: delay = int(os.environ.get( 'JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY')) elif 'JOBSLIB_ONE_INSTANCE_LOCK_DELAY' in os.environ: delay = int(os.environ.get('JOBSLIB_ONE_INSTANCE_LOCK_DELAY')) else: delay = self._settings.get('lock_delay', 1) if delay < 0 or delay > 60: raise ValueError('lock_delay must be between 0 and 60 seconds') return delay
def __init__(self, context, options): super().__init__(context, options) self._session_id = None self._refresh_lock_flag = False self._consul = Consul( scheme=self.options.scheme, host=self.options.host, port=self.options.port, timeout=self.options.timeout, ) def acquire(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _create_session(): return self._consul.session.create( ttl=self.options.ttl, lock_delay=self.options.lock_delay) @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _acquire_lock(data, session_id): return self._consul.kv.put( self.options.key, data, acquire=session_id) @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _destroy_session(session_id): self._consul.session.destroy(session_id) timestamp = get_current_time() record = { 'fqdn': self.context.fqdn, 'timestamp': timestamp, 'time_utc': to_utc(timestamp), 'time_local': to_local(timestamp), } session_id = _create_session() try: res = _acquire_lock(json.dumps(record), session_id) except Exception: logger.exception("Can't acquire lock") else: if res is True: self._session_id = session_id self._refresh_lock_flag = False # Set SIGALRM refresh handler. If lock is not released or # extended before ttl is reached, task will be killed. signal.signal(signal.SIGALRM, self._alarm_handler) signal.alarm(self.options.ttl) return True logger.error("Can't acquire lock") _destroy_session(session_id) return False def release(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _release_lock(session_id): return self._consul.kv.put( self.options.key, None, release=session_id) try: res = _release_lock(self._session_id) except Exception: logger.exception("Can't release lock") else: if res is True: self._session_id = None self._refresh_lock_flag = False # Cancel SIGALRM signal.alarm(0) signal.signal(signal.SIGALRM, signal.SIG_DFL) return True logger.error("Can't release lock") return False def refresh(self): self._refresh_lock_flag = True return True def _alarm_handler(self, unused_signum, unused_frame): """ **SIGALRM** signal handler, it is called at the end of Consuls' session TTL. Extend the lock if request for extending is presented, otherwise raise :exc:`OneInstanceWatchdogError`. """ @retrying.retry( stop_max_delay=self.options.lock_delay, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _renew_session(session_id): return self._consul.session.renew(session_id) if self._refresh_lock_flag: try: res = _renew_session(self._session_id) except Exception: logger.exception("Can't extend lock") else: if res: self._refresh_lock_flag = False # Restart SIGALRM signal.alarm(self.options.ttl) return logger.error("Can't extend lock") raise OneInstanceWatchdogError def get_lock_owner_info(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _get_lock_owner_info(): return self._consul.kv.get(self.options.key)[1] owner_info = None try: res = _get_lock_owner_info() if res is not None and res['Value'] is not None: owner_info = json.loads(res['Value']) if not isinstance(owner_info, collections.abc.Mapping): raise ValueError('Lock owner info is not a JSON Object') except Exception: logger.exception("Can't get lock owner info") return owner_info