Source code for jobslib.oneinstance.consul

Module :mod:`jobslib.oneinstance.consul` provides :class:`ConsulLock`
lock. It is based on HashiCorp Consul, so lock is distributed among

import json
import logging
import signal
import os

import retrying

from consul import Consul
from objectvalidator import option

from . import BaseLock, OneInstanceWatchdogError
from ..config import ConfigGroup, RetryConfigMixin
from ..time import get_current_time, to_local, to_utc

__all__ = ['ConsulLock']

logger = logging.getLogger(__name__)

ONE_DAY_SECONDS = 60 * 60 * 24

[docs]class ConsulLock(BaseLock): """ Consul lock implementation, provides locking among datacenters. When the lock expires due to TTL, :exc:`OneInstanceWatchdogError` is raised. It is possible to extend the lock using :meth:`refresh`. Lock is not extended immediately, but request for extending is made and lock will be extended asynchronously when **SIGALRM** is received. So use conservative TTL and periodically extend the lock. .. warning:: :class:`ConsulLock` uses :mod:`signal` and :data:`signal.SIGALRM` for TTL mechanism, so don't use :data:`~!signal.SIGALRM` in your task. And don't use multiple instances of the :class:`ConsulLock` at the same time, becase :data:`~!signal.SIGALRM` can't be shared among multiple instances. For using the :class:`ConsulLock` configure backend in :mod:`settings`: .. code-block:: python ONE_INSTANCE = { 'backend': 'jobslib.oneinstance.consul.ConsulLock', 'options': { 'host': 'hostname', 'port': 8500, 'timeout': 1.0, 'key': 'jobs/example/lock', 'ttl': 60.0, 'lock_delay': 15.0, 'retry_max_attempts': 10, 'retry_wait_multiplier': 50, }, } Or use :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_HOST`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_PORT`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_TIMEOUT`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_KEY`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_TTL`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY`, :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_MAX_ATTEMPTS` and :envvar:`JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_WAIT_MULTIPLIER` environment variables. """
[docs] class OptionsConfig(RetryConfigMixin, ConfigGroup): """ Consul lock options. """ retry_env_prefix = 'JOBSLIB_ONE_INSTANCE_CONSUL_'
[docs] @option(required=True, attrtype=str) def scheme(self): """ URI scheme, in current implementation always ``http``. """ return 'http'
[docs] @option(required=True, attrtype=str) def host(self): """ IP address or hostname of the Consul server. """ host = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_HOST') if host: return host return self._settings.get('host', '')
[docs] @option(required=True, attrtype=int) def port(self): """ Port where the Consul server listening on. """ port = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_PORT') if port: return int(port) return self._settings.get('port', 8500)
[docs] @option(required=True, attrtype=float) def timeout(self): """ Timeout in seconds for connect/read/write operation. """ timeout = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_TIMEOUT') if timeout: return float(timeout) timeout = self._settings.get('timeout', 5.0) if isinstance(timeout, int): timeout = float(timeout) return timeout
[docs] @option(required=True, attrtype=str) def key(self): """ Key under which the lock is stored. """ key = os.environ.get('JOBSLIB_ONE_INSTANCE_CONSUL_KEY') if key: return key # env for backward compatibility key = os.environ.get('JOBSLIB_ONE_INSTANCE_OPTIONS_KEY') if key: return key return self._settings['key']
[docs] @option(required=True, attrtype=int) def ttl(self): """ Maximum lock lifespan in seconds, must be between 10 seconds and one day. If value is omitted, default is one day (maximum for Consul). """ if 'JOBSLIB_ONE_INSTANCE_CONSUL_TTL' in os.environ: ttl = int(os.environ['JOBSLIB_ONE_INSTANCE_CONSUL_TTL']) elif 'JOBSLIB_ONE_INSTANCE_OPTIONS_TTL' in os.environ: # env for backward compatibility ttl = int(os.environ['JOBSLIB_ONE_INSTANCE_OPTIONS_TTL']) else: ttl = self._settings.get('ttl', ONE_DAY_SECONDS) if ttl < 10 or ttl > ONE_DAY_SECONDS: raise ValueError( 'TTL must be between 10 and {} seconds'.format( ONE_DAY_SECONDS)) return ttl
[docs] @option(required=True, attrtype=int) def lock_delay(self): """ When sessions invalidation request is received, wait *lock_delay* seconds before session is truly invalidated. Value must be between 0 and 60 seconds, default is 1. """ if 'JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY' in os.environ: delay = int(os.environ.get( 'JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY')) elif 'JOBSLIB_ONE_INSTANCE_LOCK_DELAY' in os.environ: delay = int(os.environ.get('JOBSLIB_ONE_INSTANCE_LOCK_DELAY')) else: delay = self._settings.get('lock_delay', 1) if delay < 0 or delay > 60: raise ValueError('lock_delay must be between 0 and 60 seconds') return delay
def __init__(self, context, options): super().__init__(context, options) self._session_id = None self._refresh_lock_flag = False self._consul = Consul( scheme=self.options.scheme,, port=self.options.port, timeout=self.options.timeout, ) def acquire(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _create_session(): return self._consul.session.create( ttl=self.options.ttl, lock_delay=self.options.lock_delay) @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _acquire_lock(data, session_id): return self._consul.kv.put( self.options.key, data, acquire=session_id) @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _destroy_session(session_id): self._consul.session.destroy(session_id) timestamp = get_current_time() record = { 'fqdn': self.context.fqdn, 'timestamp': timestamp, 'time_utc': to_utc(timestamp), 'time_local': to_local(timestamp), } session_id = _create_session() try: res = _acquire_lock(json.dumps(record), session_id) except Exception: logger.exception("Can't acquire lock") else: if res is True: self._session_id = session_id self._refresh_lock_flag = False # Set SIGALRM refresh handler. If lock is not released or # extended before ttl is reached, task will be killed. signal.signal(signal.SIGALRM, self._alarm_handler) signal.alarm(self.options.ttl) return True logger.error("Can't acquire lock") _destroy_session(session_id) return False def release(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _release_lock(session_id): return self._consul.kv.put( self.options.key, None, release=session_id) try: res = _release_lock(self._session_id) except Exception: logger.exception("Can't release lock") else: if res is True: self._session_id = None self._refresh_lock_flag = False # Cancel SIGALRM signal.alarm(0) signal.signal(signal.SIGALRM, signal.SIG_DFL) return True logger.error("Can't release lock") return False def refresh(self): self._refresh_lock_flag = True return True def _alarm_handler(self, unused_signum, unused_frame): """ **SIGALRM** signal handler, it is called at the end of Consuls' session TTL. Extend the lock if request for extending is presented, otherwise raise :exc:`OneInstanceWatchdogError`. """ @retrying.retry( stop_max_delay=self.options.lock_delay, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _renew_session(session_id): return self._consul.session.renew(session_id) if self._refresh_lock_flag: try: res = _renew_session(self._session_id) except Exception: logger.exception("Can't extend lock") else: if res: self._refresh_lock_flag = False # Restart SIGALRM signal.alarm(self.options.ttl) return logger.error("Can't extend lock") raise OneInstanceWatchdogError def get_lock_owner_info(self): @retrying.retry( stop_max_attempt_number=self.options.retry_max_attempts, wait_exponential_multiplier=self.options.retry_wait_multiplier) def _get_lock_owner_info(): return self._consul.kv.get(self.options.key)[1] owner_info = None try: res = _get_lock_owner_info() if res is not None and res['Value'] is not None: owner_info = json.loads(res['Value']) if not isinstance(owner_info, raise ValueError('Lock owner info is not a JSON Object') except Exception: logger.exception("Can't get lock owner info") return owner_info