Source code for jobslib.tasks
"""
Module :mod:`shelter.tasks` provides an ancestor class for writing tasks.
"""
import enum
import logging
import signal
import sys
import time
from .exceptions import Terminate
from .oneinstance import OneInstanceWatchdogError
from .time import get_current_time
__all__ = ['BaseTask']
class JobStatus(enum.Enum):
UNKNOWN = 'unknown'
SUCCEEDED = 'succeeded'
FAILED = 'failed'
PENDING = 'pending'
INTERRUPTED = 'interrupted'
KILLED = 'killed'
[docs]class BaseTask(object):
"""
Ancestor for task. Inherit this class and adjust :attr:`name`,
:attr:`description` and optionally :attr:`arguments` attributes and
override :meth:`task` method. Constructor's argument *config* is
instance of the :class:`~jobslib.Config` (or descendant).
There are several attributes which are set during initialization.
:attr:`context` is instance of the :class:`~jobslib.Context`.
Configuration is available on context as :attr:`Context.config`
attribute. :attr:`logger` is instance of the :class:`logging.Logger`.
:attr:`stdout` and :attr:`stderr` are file-like objects for standard
output and error.
.. code-block:: python
from jobslib import BaseTask, argument
class HelloWorldTask(BaseTask):
name = 'hello'
description = 'prints hello world'
arguments = (
argument('--to-stderr', action='strore_true', default=False,
help='use stderr instead of stdout'),
)
def task(self):
self.logger.info("Hello world")
if self.context.config.to_stderr:
self.stderr("Hello world\\n")
self.stderr.flush()
else:
self.stdout("Hello world\\n")
self.stdout.flush()
"""
name = ''
"""
Task name.
"""
description = ''
"""
Task description.
"""
arguments = ()
"""
Task command line arguments. :class:`tuple` containing command line
arguments. Each argument is defined using :func:`~jobslib.argument`
function.
.. code-block:: python
arguments = (
argument('-f', '--file', action='store', dest='filename'),
)
"""
def __init__(self, config):
self.context = config.context_class.from_config(config)
self.logger = logging.getLogger(
'{}.{}'.format(self.__class__.__module__, self.__class__.__name__))
self.stdout = sys.stdout
self.stderr = sys.stderr
self.initialize()
def __call__(self):
self.context.config._configure_logging()
lock = self.context.one_instance_lock
liveness = self.context.liveness
metrics = self.context.metrics
while 1:
start_time = time.time()
last_successful_run_timestamp = None
job_status = JobStatus.UNKNOWN
keep_lock = self.context.config.keep_lock
release_on_error = self.context.config.release_on_error
try:
if lock.acquire():
terminate = False
try:
self.logger.info("Run task")
signal.signal(signal.SIGTERM, self.terminate_process)
signal.signal(signal.SIGINT, self.terminate_process)
try:
self.task()
finally:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
self.logger.info("Task done")
except Terminate:
terminate = True
raise
finally:
if (keep_lock
and not self.context.config.run_once
and not terminate):
lock.refresh()
else:
lock.release()
liveness.write()
job_status = JobStatus.SUCCEEDED
last_successful_run_timestamp = get_current_time()
else:
lock_owner_info = lock.get_lock_owner_info()
if lock_owner_info:
self.logger.info(
"Can't acquire lock (lock owner is %s, "
"locked at %s UTC)", lock_owner_info.get('fqdn'),
lock_owner_info.get('time_utc'))
else:
self.logger.info("Can't acquire lock")
keep_lock = False
job_status = JobStatus.PENDING
except OneInstanceWatchdogError:
dur = time.time() - start_time
self.logger.exception("Lock has expired after %d seconds", dur)
job_status = JobStatus.INTERRUPTED
if self.context.config.run_once:
raise
except Terminate:
self.logger.warning("Task has been terminated")
job_status = JobStatus.KILLED
raise
except Exception:
self.logger.exception("%s task failed", self.name)
job_status = JobStatus.FAILED
if self.context.config.run_once:
raise
finally:
metrics_data = {
'job_duration_seconds': {
'value': time.time() - start_time,
'tags': {
'status': job_status.value,
'type': 'task',
},
},
}
if last_successful_run_timestamp:
metrics_data['last_successful_run_timestamp'] = {
'value': get_current_time(),
}
metrics.push(metrics_data)
if self.context.config.run_once:
break
if self.context.config.sleep_interval:
sleep_time = self.context.config.sleep_interval
else:
next_run = start_time + self.context.config.run_interval
sleep_time = max(next_run - time.time(), 0)
failed_and_release = \
(job_status == JobStatus.FAILED) and release_on_error
if keep_lock and not failed_and_release:
self.logger.info(
"Sleep for %d seconds, lock is kept", sleep_time)
signal.signal(signal.SIGTERM, self.terminate_process)
signal.signal(signal.SIGINT, self.terminate_process)
try:
sleep_start_time = time.time()
sleep_stop_time = sleep_start_time + sleep_time
try:
while time.time() < sleep_stop_time:
lock.refresh()
time.sleep(1)
finally:
lock.release()
finally:
signal.signal(signal.SIGTERM, signal.SIG_DFL)
signal.signal(signal.SIGINT, signal.SIG_DFL)
else:
# we need wait 2*sleep_time
# because another instance need time to take lock
sleep_time = \
sleep_time if not failed_and_release else sleep_time * 2
if failed_and_release:
lock.release()
self.logger.info("Sleep for %d seconds", sleep_time)
time.sleep(sleep_time)
def initialize(self):
"""
Initialize instance attributes. You can override this method in
the subclasses.
"""
pass
[docs] def task(self):
"""
Task body, override this method.
"""
raise NotImplementedError
def terminate_process(self, unused_signal_number, unused_frame):
raise Terminate
[docs] def extend_lock(self):
"""
Refresh existing lock. Return :data:`!True` if lock has been
successfuly refreshed, otherwise return :data:`!False`. See
:mod:`jobslib.oneinstance`.
"""
return self.context.one_instance_lock.refresh()
class _Task(BaseTask):
"""
Ancestor for internal task. Only for internal usage.
"""
def __call__(self):
self.context.config._configure_logging()
self.task()