Reference manual

Settings – basic configuration of your application

Basic configuration of your task is placed in settings module. It is common Python module, which is passed into Jobslib using either -s/--settings command line argument or JOBSLIB_SETTINGS_MODULE environment variable. It contains a few options for basic application settings.

$ # Pass settings module using -s argument
$ runjob -s myapp.settings myapp.task.HelloWorld

$ # Pass settings module using environment variable
$ JOBSLIB_SETTINGS_MODULE=myapp.settings runjob myapp.task.HelloWorld

List of basic settings

Some options try obtainig values from several sources. First from command line argument, then from environment variable and finally from settings module. Environment variables are JOBSLIB_ prefixed.

settings.CONFIG_CLASS

Default: 'jobslib.Config'

Application configuration class. Jobslib provides default Config class, which converts basic options from settings module to instance attributes.

CONFIG_CLASS = 'myapplicaton.config.Config'
settings.CONTEXT_CLASS

Default: 'jobslib.Context'

Application context class. Context is a container for shared resources, e.g. database connection. Jobslib provides default Context class.

CONTEXT_CLASS = 'myapplicaton.context.Context'
--run-once
JOBSLIB_RUN_ONCE
settings.RUN_ONCE

Default: False

If set to True, indicates that task will be run only once.

RUN_ONCE = True
--sleep-interval
JOBSLIB_SLEEP_INTERVAL
settings.SLEEP_INTERVAL

Default: 0

Sleep interval in seconds after task is done.

SLEEP_INTERVAL = 60.0
--run-interval
JOBSLIB_RUN_INTERVAL
settings.RUN_INTERVAL

Default: 0

Run interval in seconds. If task is run longer than this interval, next loop is run imediately after task is done.

RUN_INTERVAL = 60.0
--keep-lock
JOBSLIB_KEEP_LOCK
settings.KEEP_LOCK

Default: False

If set to True, indicates that lock will be kept during sleeping.

KEEP_LOCK = True
--release-on-error
JOBSLIB_RELEASE_ON_ERROR
settings.RELEASE_ON_ERROR

Default: False

If set to True, indicates that lock will be release on error (work with --keep-lock).

Warning

If lock is released on error job wait 2*sleep_time.

RELEASE_ON_ERROR = True
settings.LIVENESS

Default: {'backend': 'jobslib.liveness.dummy.DummyLiveness'}

Liveness implementation class. Value must be dict containing backend key, which is Python’s module path [package.[submodule.]]module.ClassName. Or JOBSLIB_LIVENESS_BACKEND can be used. If value is not defined, default value jobslib.liveness.dummy.DummyLiveness is used. See jobslib.oneinstance.

LIVENESS = {
    'backend': 'jobslib.liveness.consul.ConsulLiveness',
    'options': {
        'host': 'hostname',
        'port': 8500,
        'timeout': 1.0,
        'key': 'jobs/example/liveness',
    },
}
settings.METRICS

Default: {'backend': 'jobslib.metrics.dummy.DummyMetrics'}

Metrics implementation class. Value must be dict containing backend key, which is Python’s module path [package.[submodule.]]module.ClassName. Or JOBSLIB_METRICS_BACKEND can be used. If value is not defined, default value jobslib.metrics.dummy.DummyMetrics is used. See jobslib.metrics.

METRICS = {
    'backend': 'jobslib.metrics.influxdb.InfluxDBMetrics',
    'options': {
        'host': 'hostname',
        'port': 8086,
        'username': 'root',
        'password': 'root',
        'database': 'dbname',
    },
}
settings.ONE_INSTANCE

Default: no default value, required option

One instance lock implementation class. Value must be dict containing backend key, which is Python’s module path [package.[submodule.]]module.ClassName. Or JOBSLIB_ONE_INSTANCE_BACKEND can be used. For development purposes you can use jobslib.oneinstance.dummy.DummyLock. If --disable-one-instance argument is passed, dummy lock will be forced. See jobslib.oneinstance.

ONE_INSTANCE = {
    'backend': 'jobslib.oneinstance.consul.ConsulLock',
    'options': {
        'host': 'hostname',
        'port': 8500,
        'timeout': 1.0,
        'key': 'jobs/example/oneinstance/lock',
        'ttl': 30,
    }
}
settings.LOGGING
JOBSLIB_LOGGING

Default: root logger which logs to console:

Takes the logging configuration from a dictionary. See logging.config module and logging.config.dictConfig() function documentation.

{
    'version': 1,
    'disable_existing_loggers': False,
    'formatters': {
        'default': {
            'format': '%(asctime)s %(name)s %(levelname)s %(message)s',
            'datefmt': '%Y-%m-%d %H:%M:%S',
        },
    },
    'handlers': {
        'console': {
            'class': 'logging.StreamHandler',
            'level': 'NOTSET',
            'formatter': 'default',
        },
    },
    'root': {
        'handlers': ['console'],
        'level': 'INFO',
    },
}

If logging configuration is passed in evironment variable, JSON object is expected.

Config – container for configuration

Module jobslib.config provides base class which encapsulates configuration.

jobslib.argument(*args, **kwargs)[source]

Define how a single command-line argument should be parsed. args and kwargs have the same meaning as argparse.ArgumentParser.add_argument() method.

class jobslib.Config(settings, args_parser, task_cls)[source]

Class which encapsulates configuration. It joins configuration values from settings module and from command line. settings is Python module defined by either JOBSLIB_SETTINGS_MODULE environment variable or -s/--settings command line argument. args_parser is instance of the argparse.Namespace. Both values are available on class, settings as a _settings attribute and args_parser as an _args_parser attribute.

Configuration options are placed on class as methods decorated by option decorator. During class initialization all decorated methods are read, so it implies values validation a caching. If you want nested configurations options, use ConfigGroup. Reading value of the nested class during initialization implies the same mechanism, so all configuration will be validated and cached recursively.

Example of the custom Config class:

import os

from jobslib import Config, ConfigGroup, option

class AuthServiceConfig(ConfigGroup):

    @option(required=True, attrtype=str)
    def uri(self):
        # First try reading value from command line
        uri = self._args_parser.auth_servise_uri
        if uri:
            return uri

        # Then try reading value from environment variable
        uri = os.environ.get('MYAPP_AUTH_SERVICE_URI')
        if uri is not None:
            return uri

        # Finally try reading value from settings
        return self._settings['uri']

class MyAppConfig(Config):

    @option
    def auth_service(self):
        return AuthServiceConfig(
            self._settings['AUTH_SERVICE'], self._args_parser)

And write into settings module:

CONFIG_CLASS = 'myapp.config.MyAppConfig'

AUTH_SERVICE = {
    'uri': 'http://example.com/api/v1/auth',
}

Configuration options are available on the Config as attributes. If any configuration value is not valid, exception will be raised and instance will not be created.

>>> context.config.auth_service.uri
'http://example.com/api/v1/auth'
initialize(*args, **kwargs)[source]

Initialize instance attributes. You can override this method in the subclasses.

logging[source]

Python’s logging configuration or BASE_LOGGING if value is not defined. BASE_LOGGING allowes INFO and higher leveled messages and forwards them onto console. Format is logging.config.dictConfig().

context_class[source]

Context class, either Context class or subclass.

task_class[source]

Task class, subclass of the BaseTask.

run_once[source]

bool that indicates that task will be run only once.

sleep_interval[source]

Sleep interval in seconds after task is done.

run_interval[source]

Run interval in seconds. If task is run longer than this interval, next loop is run imediately after task is done.

release_on_error[source]

bool that indicates that lock will be release on task error.

keep_lock[source]

bool that indicates that lock will be kept during sleeping.

one_instance[source]

Configuration of the one instance lock. Instance of the OneInstanceConfig.

liveness[source]

Configuration of the health state writer. Instance of the LivenessConfig.

Context – container for shared resources

Module jobslib.context provides base class which encapsulates necessary resources (configuration, database connection, …) for tasks.

class jobslib.Context(config)[source]

Class which encapsulates resources (configuration, database connection, …). Instance of this class is created during task initialization. So in your task you have access to all necessary resources. Inherit this class and enrich it of necessary properties. According to your requirements cache values using cached_property() decorator.

Example of the custom Context class:

from xmlrpc.client import ServerProxy

from jobslib import Context, cached_property

class MyAppContext(Context):

    @cached_property
    def auth_service(self):
        return ServerProxy(uri=self.config.auth_service.uri)

And write into settings module:

CONTEXT_CLASS = 'myapp.context.MyAppContext'
initialize()[source]

Initialize instance attributes. You can override this method in the subclasses.

config

Application’s configuration, instance of the Config.

fqdn

Fully qualified domain name of the local machine as str.

one_instance_lock

One instance lock, instance of the jobslib.oneinstance.BaseLock descendant.

liveness

Health state writer, instance of the jobslib.liveness.BaseLiveness descendant.

metrics

Metrics writer, instance of the jobslib.liveness.BaseMetrics descendant.

Task – class which encapsulates task

Module shelter.tasks provides an ancestor class for writing tasks.

class jobslib.BaseTask(config)[source]

Ancestor for task. Inherit this class and adjust name, description and optionally arguments attributes and override task() method. Constructor’s argument config is instance of the Config (or descendant).

There are several attributes which are set during initialization. context is instance of the Context. Configuration is available on context as Context.config attribute. logger is instance of the logging.Logger. stdout and stderr are file-like objects for standard output and error.

from jobslib import BaseTask, argument

class HelloWorldTask(BaseTask):

    name = 'hello'
    description = 'prints hello world'
    arguments = (
        argument('--to-stderr', action='strore_true', default=False,
                 help='use stderr instead of stdout'),
    )

    def task(self):
        self.logger.info("Hello world")
        if self.context.config.to_stderr:
            self.stderr("Hello world\n")
            self.stderr.flush()
        else:
            self.stdout("Hello world\n")
            self.stdout.flush()
name = ''

Task name.

description = ''

Task description.

arguments = ()

Task command line arguments. tuple containing command line arguments. Each argument is defined using argument() function.

arguments = (
    argument('-f', '--file', action='store', dest='filename'),
)
task()[source]

Task body, override this method.

extend_lock()[source]

Refresh existing lock. Return True if lock has been successfuly refreshed, otherwise return False. See jobslib.oneinstance.

Liveness – informations about health state of the task

Module liveness provides functionality for exporting informations about health state of the task. When task is successfuly finished, some state is written. BaseLiveness is ancestor, it is abstract class which defines API, not functionality. Override this class if you want to write own implementation of the liveness.

class jobslib.liveness.BaseLiveness(context, options)[source]

Provides liveness API. Inherit this class and override abstract method write(). Configuration options are defined in OptionsConfig class, which is ConfigGroup descendant.

class OptionsConfig(*args, **kwargs)[source]

Validation of the liveness configuration, see ConfigGroup.

check(max_age)[source]

Check liveness and return True when liveness timestamp is younger than max_age, False when liveness timestamp is older than max_age.

get_state()[source]

Return health state as a str.

abstract read()[source]

Read informations about health state of the task.

abstract write()[source]

Write informations about health state of the task.

class jobslib.liveness.dummy.DummyLiveness(context, options)[source]

Dummy liveness implementation. Doesn’t provide real functionality. It is useful for development or if it is not necessary liveness. For use of DummyLiveness write into settings:

LIVENESS = {
    'backend': 'jobslib.liveness.dummy.DummyLiveness',
}
class jobslib.liveness.consul.ConsulLiveness(context, options)[source]

Consul liveness implementation. Provides exporting informations about health state into Consul’s key/value storage.

For use of ConsulLiveness write into settings:

LIVENESS = {
    'backend': 'jobslib.liveness.consul.ConsulLiveness',
    'options': {
        'host': 'hostname',
        'port': 8500,
        'timeout': 1.0,
        'key': 'jobs/example/liveness',
        'retry_max_attempts': 10,
        'retry_wait_multiplier': 50,
    },
}

Or use JOBSLIB_LIVENESS_CONSUL_HOST, JOBSLIB_LIVENESS_CONSUL_PORT, JOBSLIB_LIVENESS_CONSUL_TIMEOUT, JOBSLIB_LIVENESS_CONSUL_KEY, JOBSLIB_LIVENESS_CONSUL_RETRY_MAX_ATTEMPTS and JOBSLIB_LIVENESS_CONSUL_RETRY_WAIT_MULTIPLIER environment variables.

class OptionsConfig(*args, **kwargs)[source]

Consul liveness options.

host[source]

IP address or hostname of the Consul server.

key[source]

Key under which the health state is stored.

port[source]

Port where the Consul server listening on.

scheme[source]

URI scheme, in current implementation always http.

timeout[source]

Timeout in seconds for connect/read/write operation.

Metrics – task metrics

Provides metrics API. Inherit this class and override abstract method push_monitoring_metrics(). Configuration options are defined in OptionsConfig class, which is ConfigGroup descendant.

class jobslib.metrics.BaseMetrics(context, options)[source]
class OptionsConfig(*args, **kwargs)[source]

Validation of the liveness configuration, see ConfigGroup.

abstract push(metrics)[source]

Push metrics. metrics are dict, where key is metric name and value is dict structure containing value and optionally tags.

metrics = {
    "flow_velocity": {
        "value": 30.0,
        "tags": {
            "tag_name1": "tag_value1",
            "tag_name2": "tag_value2",
        },
    },
    "processed_requests": {
        "value": 5,
        "tags": {
            "tag_name3": "tag_value3",
        },
    },
}
class jobslib.metrics.dummy.DummyMetrics(context, options)[source]

Dummy metrics implementation. Doesn’t provide real functionality. It is useful for development or if it is not necessary metrics. For use of DummyMetrics write into settings:

METRICS = {
    'backend': 'jobslib.metrics.dummy.DummyMetrics',
}
class jobslib.metrics.influxdb.InfluxDBMetrics(context, options)[source]

InfluxDB metrics implementation.

For use of InfluxDBMetrics write into settings:

METRICS = {
    'backend': 'jobslib.metrics.influxdb.InfluxDBMetrics',
    'options': {
        'host': 'hostname',
        'port': 8086,
        'username': 'root',
        'password': 'root',
        'database': 'dbname',
        'retry_max_attempts': 10,
        'retry_wait_multiplier': 50,
    },
}

Or use JOBSLIB_METRICS_INFLUXDB_HOST, JOBSLIB_METRICS_INFLUXDB_PORT, JOBSLIB_METRICS_INFLUXDB_USERNAME, JOBSLIB_METRICS_INFLUXDB_PASSWORD, JOBSLIB_METRICS_INFLUXDB_DBNAME, JOBSLIB_METRICS_INFLUXDB_RETRY_MAX_ATTEMPTS and JOBSLIB_METRICS_INFLUXDB_RETRY_WAIT_MULTIPLIER environment variables.

One Instance Lock – only one running instance at the same time

Module oneinstance provides a lock which allowes only one running instance at the same time. The Lock is used when --one-instance command line argument is passed. When acquiring the lock is not possible, task is not run and process is slept for --sleep-interval seconds. Then runjob will try to acquire lock again. If implementation of the lock supports TTL and you need extend the lock, it is possible call BaseLock.refresh() inside your jobslib.BaseTask.task(). Otherwise task is aborted.

BaseLock is ancestor, it is an abstract class which defines API, not locking functionality. Override the class if you want write own implementation of the lock.

class jobslib.oneinstance.BaseLock(context, options)[source]

Provides lock’s API. Inherit this class and override abstract methods acquire(), release() and refresh(). Configuration options are defined in OptionsConfig class, which is ConfigGroup descendant.

class OptionsConfig(*args, **kwargs)[source]

Validation of the lock’s configuration, see ConfigGroup.

abstract acquire()[source]

Acquire a lock. Return True if lock has been successfuly acquired, otherwise return False.

get_lock_owner_info()[source]

Return lock’s owner information. It depends on implementation, return dict or None if information is not available.

abstract refresh()[source]

Refresh existing lock. Return True if lock has been successfuly refreshed, otherwise return False.

abstract release()[source]

Release existing lock. Return True if lock has been successfuly released, otherwise return False.

exception jobslib.oneinstance.OneInstanceWatchdogError[source]

Indicates that TTL of the lock has been reached.

class jobslib.oneinstance.dummy.DummyLock(context, options)[source]

Dummy lock implementation. Doesn’t provide real locking, all methods always return True. It is useful for development or if it is not necessary run only one instance at the same time. For using the DummyLock configure backend in settings:

ONE_INSTANCE = {
    'backend': 'jobslib.oneinstance.dummy.DummyLock',
}
class jobslib.oneinstance.consul.ConsulLock(context, options)[source]

Consul lock implementation, provides locking among datacenters. When the lock expires due to TTL, OneInstanceWatchdogError is raised. It is possible to extend the lock using refresh(). Lock is not extended immediately, but request for extending is made and lock will be extended asynchronously when SIGALRM is received. So use conservative TTL and periodically extend the lock.

Warning

ConsulLock uses signal and signal.SIGALRM for TTL mechanism, so don’t use SIGALRM in your task. And don’t use multiple instances of the ConsulLock at the same time, becase SIGALRM can’t be shared among multiple instances.

For using the ConsulLock configure backend in settings:

ONE_INSTANCE = {
    'backend': 'jobslib.oneinstance.consul.ConsulLock',
    'options': {
        'host': 'hostname',
        'port': 8500,
        'timeout': 1.0,
        'key': 'jobs/example/lock',
        'ttl': 60.0,
        'lock_delay': 15.0,
        'retry_max_attempts': 10,
        'retry_wait_multiplier': 50,
    },
}

Or use JOBSLIB_ONE_INSTANCE_CONSUL_HOST, JOBSLIB_ONE_INSTANCE_CONSUL_PORT, JOBSLIB_ONE_INSTANCE_CONSUL_TIMEOUT, JOBSLIB_ONE_INSTANCE_CONSUL_KEY, JOBSLIB_ONE_INSTANCE_CONSUL_TTL, JOBSLIB_ONE_INSTANCE_CONSUL_LOCK_DELAY, JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_MAX_ATTEMPTS and JOBSLIB_ONE_INSTANCE_CONSUL_RETRY_WAIT_MULTIPLIER environment variables.

class OptionsConfig(*args, **kwargs)[source]

Consul lock options.

host[source]

IP address or hostname of the Consul server.

key[source]

Key under which the lock is stored.

lock_delay[source]

When sessions invalidation request is received, wait lock_delay seconds before session is truly invalidated. Value must be between 0 and 60 seconds, default is 1.

port[source]

Port where the Consul server listening on.

scheme[source]

URI scheme, in current implementation always http.

timeout[source]

Timeout in seconds for connect/read/write operation.

ttl[source]

Maximum lock lifespan in seconds, must be between 10 seconds and one day. If value is omitted, default is one day (maximum for Consul).