"""
Module :mod:`jobslib.config` provides base class which encapsulates
configuration.
"""
import json
import logging.config
import os
from objectvalidator import option, OptionsContainer
from .context import Context
from .imports import import_object
from .logging import BASE_LOGGING
__all__ = ['Config', 'ConfigGroup']
class ConfigGroup(OptionsContainer):
"""
Container for configuration. During initialization values from all
methods decorated by :class:`option` decorator are read. So if class
is successfuly initialized, all options are validated and cached.
"""
def initialize(self, *args, **unused_kwargs):
"""
Initialize instance attributes. You can override this method in
the subclasses.
"""
self._settings, self._args_parser = args
@property
def as_kwargs(self):
"""
Return all options as a :class:`dict`.
"""
return {
name: getattr(self, name)
for name in option.get_option_names(self)
}
[docs]class Config(OptionsContainer):
"""
Class which encapsulates configuration. It joins configuration values
from :mod:`settings` module and from command line. :mod:`settings` is
Python module defined by either :envvar:`JOBSLIB_SETTINGS_MODULE`
environment variable or :option:`-s/--settings` command line argument.
*args_parser* is instance of the :class:`argparse.Namespace`. Both values
are available on class, *settings* as a :attr:`_settings` attribute and
*args_parser* as an :attr:`_args_parser` attribute.
Configuration options are placed on class as methods decorated by
:class:`option` decorator. During class initialization all decorated
methods are read, so it implies values validation a caching. If you want
nested configurations options, use :class:`ConfigGroup`. Reading value of
the nested class during initialization implies the same mechanism, so all
configuration will be validated and cached recursively.
Example of the custom :class:`Config` class:
.. code-block:: python
import os
from jobslib import Config, ConfigGroup, option
class AuthServiceConfig(ConfigGroup):
@option(required=True, attrtype=str)
def uri(self):
# First try reading value from command line
uri = self._args_parser.auth_servise_uri
if uri:
return uri
# Then try reading value from environment variable
uri = os.environ.get('MYAPP_AUTH_SERVICE_URI')
if uri is not None:
return uri
# Finally try reading value from settings
return self._settings['uri']
class MyAppConfig(Config):
@option
def auth_service(self):
return AuthServiceConfig(
self._settings['AUTH_SERVICE'], self._args_parser)
And write into :mod:`settings` module:
.. code-block:: python
CONFIG_CLASS = 'myapp.config.MyAppConfig'
AUTH_SERVICE = {
'uri': 'http://example.com/api/v1/auth',
}
Configuration options are available on the :class:`Config` as
attributes. If any configuration value is not valid, exception will
be raised and instance will not be created.
.. code-block:: python
>>> context.config.auth_service.uri
'http://example.com/api/v1/auth'
"""
def __init__(self, settings, args_parser, task_cls):
self._settings = settings
self._args_parser = args_parser
self._task_cls = task_cls
super().__init__()
def __repr__(self):
return "<{}.{}: {:#x}>".format(
self.__class__.__module__, self.__class__.__name__, id(self)
)
[docs] def initialize(self, *args, **kwargs):
"""
Initialize instance attributes. You can override this method in
the subclasses.
"""
pass
def _configure_logging(self):
"""
Configure Python's logging according to configuration stored in the
:attr:`logging` property.
"""
logging.config.dictConfig(self.logging)
[docs] @option
def logging(self):
"""
Python's logging configuration or :data:`BASE_LOGGING` if value is
not defined. :data:`BASE_LOGGING` allowes :data:`~logging.INFO`
and higher leveled messages and forwards them onto console. Format
is :func:`logging.config.dictConfig`.
"""
logging_cfg = os.environ.get('JOBSLIB_LOGGING')
if logging_cfg:
return json.loads(logging_cfg)
return getattr(self._settings, 'LOGGING', BASE_LOGGING)
[docs] @option
def context_class(self):
"""
Context class, either :class:`~jobslib.Context` class or subclass.
"""
context_cls_name = getattr(self._settings, 'CONTEXT_CLASS', '')
if context_cls_name:
context_class = import_object(context_cls_name)
else:
context_class = Context
return context_class
[docs] @option
def task_class(self):
"""
Task class, subclass of the :class:`~jobslib.BaseTask`.
"""
return self._task_cls
[docs] @option
def run_once(self):
"""
:class:`!bool` that indicates that task will be run only once.
"""
if self._args_parser.run_once is not None:
return self._args_parser.run_once
run_once = os.environ.get('JOBSLIB_RUN_ONCE')
if run_once:
return bool(int(run_once))
return getattr(self._settings, 'RUN_ONCE', False)
[docs] @option(attrtype=int)
def sleep_interval(self):
"""
Sleep interval in seconds after task is done.
"""
if self._args_parser.sleep_interval is not None:
sleep_interval = self._args_parser.sleep_interval
else:
sleep_interval = os.environ.get('JOBSLIB_SLEEP_INTERVAL')
if sleep_interval:
sleep_interval = int(sleep_interval)
else:
sleep_interval = getattr(self._settings, 'SLEEP_INTERVAL', 0)
if sleep_interval < 0:
raise ValueError('Run interval may not be less than 0')
return sleep_interval
[docs] @option(attrtype=int)
def run_interval(self):
"""
Run interval in seconds. If task is run longer than this interval,
next loop is run imediately after task is done.
"""
if self._args_parser.run_interval is not None:
run_interval = self._args_parser.run_interval
else:
run_interval = os.environ.get('JOBSLIB_RUN_INTERVAL')
if run_interval:
run_interval = int(run_interval)
else:
run_interval = getattr(self._settings, 'RUN_INTERVAL', 0)
if run_interval < 0:
raise ValueError('Run interval may not be less than 0')
return run_interval
[docs] @option
def release_on_error(self):
"""
:class:`!bool` that indicates that lock will be release on task error.
"""
if self._args_parser.release_on_error is not None:
return self._args_parser.release_on_error
release_on_error = os.environ.get('JOBSLIB_RELEASE_ON_ERROR')
if release_on_error:
return bool(int(release_on_error))
return getattr(self._settings, 'RELEASE_ON_ERROR', False)
[docs] @option
def keep_lock(self):
"""
:class:`!bool` that indicates that lock will be kept during sleeping.
"""
if self._args_parser.keep_lock is not None:
return self._args_parser.keep_lock
keep_lock = os.environ.get('JOBSLIB_KEEP_LOCK')
if keep_lock:
return bool(int(keep_lock))
return getattr(self._settings, 'KEEP_LOCK', False)
[docs] @option
def one_instance(self):
"""
Configuration of the one instance lock. Instance of the
:class:`OneInstanceConfig`.
"""
return OneInstanceConfig(
getattr(self._settings, 'ONE_INSTANCE', {}), self._args_parser)
[docs] @option
def liveness(self):
"""
Configuration of the health state writer. Instance of the
:class:`LivenessConfig`.
"""
return LivenessConfig(
getattr(self._settings, 'LIVENESS', {}), self._args_parser)
@option
def metrics(self):
return MetricsConfig(
getattr(self._settings, 'METRICS', {}), self._args_parser)
class OneInstanceConfig(ConfigGroup):
"""
Configuration of the one instance lock.
"""
@option
def backend(self):
"""
One instance lock implementation class. Value must be Python's module
path ``[package.[submodule.]]module.ClassName``. For development
purposes you can use ``jobslib.oneinstance.dummy.DummyLock``. If
``--disable-one-instance`` argument is passed,
:class:`jobslib.oneinstance.dummy.DummyLock` will be forced.
"""
if self._args_parser.disable_one_instance:
cls_name = 'jobslib.oneinstance.dummy.DummyLock'
else:
cls_name = os.environ.get('JOBSLIB_ONE_INSTANCE_BACKEND')
if not cls_name:
cls_name = self._settings['backend']
return import_object(cls_name)
@option
def options(self):
"""
Constructor's arguments of the one instance implementation class.
It depends on :meth:`backend` attribute.
"""
return self.backend.OptionsConfig(
self._settings.get('options', {}), self._args_parser)
class LivenessConfig(ConfigGroup):
"""
Configuration of the liveness writer.
"""
@option
def backend(self):
"""
Liveness implementation class. If value is not defined, default
value ``jobslib.liveness.dummy.DummyLiveness`` is used.
"""
cls_name = os.environ.get('JOBSLIB_LIVENESS_BACKEND')
if not cls_name:
cls_name = self._settings.get(
'backend', 'jobslib.liveness.dummy.DummyLiveness')
return import_object(cls_name)
@option
def options(self):
"""
Constructor's arguments of the liveness implementation class.
"""
return self.backend.OptionsConfig(
self._settings.get('options', {}), self._args_parser)
class MetricsConfig(ConfigGroup):
"""
Configuration of the metrics writer.
"""
@option
def backend(self):
"""
Metrics implementation class. If value is not defined, default
value ``jobslib.metrics.dummy.DummyMetrics`` is used.
"""
cls_name = os.environ.get('JOBSLIB_METRICS_BACKEND')
if not cls_name:
cls_name = self._settings.get(
'backend', 'jobslib.metrics.dummy.DummyMetrics')
return import_object(cls_name)
@option
def options(self):
"""
Constructor's arguments of the metrics implementation class.
"""
return self.backend.OptionsConfig(
self._settings.get('options', {}), self._args_parser)
class RetryConfigMixin(object):
@option(required=True, attrtype=int)
def retry_max_attempts(self):
"""
Number of attempts when some operation failes. Default is 0.
"""
env_name = "{}{}".format(self.retry_env_prefix, 'RETRY_MAX_ATTEMPTS')
if env_name in os.environ:
retry_max_attempts = int(os.environ.get(env_name))
else:
retry_max_attempts = self._settings.get('retry_max_attempts', 0)
return retry_max_attempts
@option(required=True, attrtype=int)
def retry_wait_multiplier(self):
"""
Wait ``2^x * multiplier`` milliseconds between each retry. Default
is 50. So waits between each retries will be 0.1 sec, 0.2 sec,
0.4 sec, 0.8 sec, 1.6 sec, 3.2 sec, ...
"""
env_name = "{}{}".format(
self.retry_env_prefix, 'RETRY_WAIT_MULTIPLIER')
if env_name in os.environ:
multiplier = int(os.environ.get(env_name))
else:
multiplier = self._settings.get('retry_wait_multiplier', 50)
return multiplier